You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/07/01 08:04:11 UTC
[3/3] apex-malhar git commit: APEXMALHAR-1953: Added
JdbcPOJONonInsertOutputOperator for update / delete / merge queries. Added
error port and error tuple handling to
AbstractJdbcTransactionableOutputOperator, Added Autometrics,
Added Unit tests and Refa
APEXMALHAR-1953: Added JdbcPOJONonInsertOutputOperator for update / delete / merge queries. Added error port and error tuple handling to AbstractJdbcTransactionableOutputOperator, Added Autometrics, Added Unit tests and Refactored accordingly
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ddd5bcf1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ddd5bcf1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ddd5bcf1
Branch: refs/heads/master
Commit: ddd5bcf1a80d22d0e2727868339757dcce3bb2b8
Parents: f54ba32
Author: bhupesh <bh...@gmail.com>
Authored: Thu Mar 17 18:24:08 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Fri Jul 1 11:53:43 2016 +0530
----------------------------------------------------------------------
.../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 29 ++--
...stractJdbcTransactionableOutputOperator.java | 77 ++++++++-
.../datatorrent/lib/db/jdbc/JdbcFieldInfo.java | 58 +++++++
.../db/jdbc/JdbcPOJOInsertOutputOperator.java | 19 ++-
.../jdbc/JdbcPOJONonInsertOutputOperator.java | 76 +++++++++
.../com/datatorrent/lib/db/jdbc/JdbcIOApp.java | 15 +-
.../lib/db/jdbc/JdbcOperatorTest.java | 171 ++++++++++++++++++-
7 files changed, 411 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index c310a40..45e0cbb 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -38,7 +38,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
@@ -61,7 +60,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort;
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
{
- private List<FieldInfo> fieldInfos;
+ private List<JdbcFieldInfo> fieldInfos;
protected List<Integer> columnDataTypes;
@@ -142,15 +141,15 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
break;
case Types.TIMESTAMP:
- statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTimestamp(i + 1, ((Getter<Object, Timestamp>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.TIME:
- statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setTime(i + 1, ((Getter<Object, Time>)activeFieldInfo.setterOrGetter).get(tuple));
break;
case Types.DATE:
- statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+ statement.setDate(i + 1, ((Getter<Object, Date>)activeFieldInfo.setterOrGetter).get(tuple));
break;
default:
@@ -169,7 +168,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
/**
* A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
*/
- public List<FieldInfo> getFieldInfos()
+ public List<JdbcFieldInfo> getFieldInfos()
{
return fieldInfos;
}
@@ -182,7 +181,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
* @description $[].pojoFieldExpression pojo field name or expression
* @useSchema $[].pojoFieldExpression input.fields[].name
*/
- public void setFieldInfos(List<FieldInfo> fieldInfos)
+ public void setFieldInfos(List<JdbcFieldInfo> fieldInfos)
{
this.fieldInfos = fieldInfos;
}
@@ -195,6 +194,10 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
return tablename;
}
+ /**
+ * Set the target table name in database
+ * @param tablename
+ */
public void setTablename(String tablename)
{
this.tablename = tablename;
@@ -259,18 +262,18 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
break;
case Types.TIMESTAMP:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class);
break;
case Types.TIME:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class);
break;
case Types.DATE:
- activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
- activeFieldInfo.fieldInfo.getPojoFieldExpression());
+ activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+ activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class);
break;
default:
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index d3300fc..0a7f3fd 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -18,8 +18,10 @@
*/
package com.datatorrent.lib.db.jdbc;
+import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import javax.validation.constraints.Min;
@@ -28,10 +30,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-
+import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
/**
@@ -69,6 +73,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
private transient int batchStartIdx;
private transient PreparedStatement updateCommand;
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<T> error = new DefaultOutputPort<>();
+
+ @AutoMetric
+ private int tuplesWrittenSuccessfully;
+ @AutoMetric
+ private int errorTuples;
+
public AbstractJdbcTransactionableOutputOperator()
{
tuples = Lists.newArrayList();
@@ -95,6 +107,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
}
@Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ tuplesWrittenSuccessfully = 0;
+ errorTuples = 0;
+ }
+
+ @Override
public void endWindow()
{
if (tuples.size() - batchStartIdx > 0) {
@@ -129,10 +149,47 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
}
updateCommand.executeBatch();
updateCommand.clearBatch();
+ batchStartIdx += tuples.size() - batchStartIdx;
+ } catch (BatchUpdateException bue) {
+ logger.error(bue.getMessage());
+ processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx);
} catch (SQLException e) {
throw new RuntimeException("processing batch", e);
- } finally {
- batchStartIdx += tuples.size() - batchStartIdx;
+ }
+ }
+
+ /**
+ * Identify which commands in the batch failed and redirect these on the error port.
+ * See https://docs.oracle.com/javase/7/docs/api/java/sql/BatchUpdateException.html for more details
+ *
+ * @param updateCounts
+ * @param commandsInBatch
+ */
+ private void processUpdateCounts(int[] updateCounts, int commandsInBatch)
+ {
+ if (updateCounts.length < commandsInBatch) {
+ // Driver chose not to continue processing after failure.
+ error.emit(tuples.get(updateCounts.length + batchStartIdx));
+ errorTuples++;
+ // In this case, updateCounts is the number of successful queries
+ tuplesWrittenSuccessfully += updateCounts.length;
+ // Skip the error record
+ batchStartIdx += updateCounts.length + 1;
+ // And process the remaining if any
+ if ((tuples.size() - batchStartIdx) > 0) {
+ processBatch();
+ }
+ } else {
+ // Driver processed all batch statements in spite of failures.
+ // Pick out the failures and send on error port.
+ tuplesWrittenSuccessfully = commandsInBatch;
+ for (int i = 0; i < commandsInBatch; i++) {
+ if (updateCounts[i] == Statement.EXECUTE_FAILED) {
+ error.emit(tuples.get(i + batchStartIdx));
+ errorTuples++;
+ tuplesWrittenSuccessfully--;
+ }
+ }
}
}
@@ -163,6 +220,20 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
*/
protected abstract void setStatementParameters(PreparedStatement statement, T tuple) throws SQLException;
+ public int getTuplesWrittenSuccessfully()
+ {
+ return tuplesWrittenSuccessfully;
+ }
+
+ /**
+ * Setter for metric tuplesWrittenSuccessfully
+ * @param tuplesWrittenSuccessfully
+ */
+ public void setTuplesWrittenSuccessfully(int tuplesWrittenSuccessfully)
+ {
+ this.tuplesWrittenSuccessfully = tuplesWrittenSuccessfully;
+ }
+
private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
new file mode 100644
index 0000000..5c7e6e8
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * A {@link FieldInfo} object for Jdbc. <br/>
+ * Includes an SQL type for each field. <br/>
+ * An {@link FieldInfo} object used for JDBC output sources must have the SQL data types.
+ * This is needed to create correct getters and setters for the POJO,
+ * as well as setting the right parameter types in the JDBC prepared statement.
+ */
+public class JdbcFieldInfo extends FieldInfo
+{
+ private int sqlType;
+
+ public JdbcFieldInfo()
+ {
+ }
+
+ public JdbcFieldInfo(String columnName, String pojoFieldExpression, SupportType type, int sqlType)
+ {
+ super(columnName, pojoFieldExpression, type);
+
+ this.sqlType = sqlType;
+ }
+
+ public int getSqlType()
+ {
+ return sqlType;
+ }
+
+ /**
+ * Set the sql data type for this {@link JdbcFieldInfo}
+ * @param sqlType
+ */
+ public void setSqlType(int sqlType)
+ {
+ this.sqlType = sqlType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index 09bab2f..67ec023 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Types;
import java.util.List;
import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
// Populate columnNames and columnDataTypes
try {
if (getFieldInfos() == null) { // then assume direct mapping
- LOG.info("Assuming direct mapping between POJO fields and DB columns");
+ LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
populateColumnDataTypes(null);
} else {
// FieldInfo supplied by user
@@ -84,20 +85,20 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
@Override
public void activate(OperatorContext context)
{
- if(getFieldInfos() == null) {
+ if (getFieldInfos() == null) {
Field[] fields = pojoClass.getDeclaredFields();
// Create fieldInfos in case of direct mapping
- List<FieldInfo> fieldInfos = Lists.newArrayList();
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
for (int i = 0; i < columnNames.size(); i++) {
String columnName = columnNames.get(i);
String pojoField = getMatchingField(fields, columnName);
- if(columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
- (pojoField == null || pojoField.length() == 0)) {
- throw new RuntimeException("Data for a non-nullable field not found in POJO");
+ if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
+ (pojoField == null || pojoField.length() == 0)) {
+ throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO");
} else {
- if(pojoField != null && pojoField.length() != 0) {
- FieldInfo fi = new FieldInfo(columnName, pojoField, null);
+ if (pojoField != null && pojoField.length() != 0) {
+ JdbcFieldInfo fi = new JdbcFieldInfo(columnName, pojoField, null, Types.NULL);
fieldInfos.add(fi);
} else {
columnDataTypes.remove(i);
@@ -138,7 +139,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
private String getMatchingField(Field[] fields, String columnName)
{
for (Field f: fields) {
- if(f.getName().equalsIgnoreCase(columnName)) {
+ if (f.getName().equalsIgnoreCase(columnName)) {
return f.getName();
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
new file mode 100644
index 0000000..82ff043
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * <p>
+ * JdbcPOJOInsertOutputOperator class.</p>
+ * An implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JdbcPOJONonInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
+{
+ @NotNull
+ String sqlStatement;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+
+ columnDataTypes = Lists.newArrayList();
+ for (JdbcFieldInfo fi : getFieldInfos()) {
+ columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+ columnDataTypes.add(fi.getSqlType());
+ }
+ }
+
+ @Override
+ protected String getUpdateCommand()
+ {
+ return sqlStatement;
+ }
+
+ /**
+ * Sets the parameterized SQL query for the JDBC update operation.
+ * This can be an update, delete or a merge query.
+ * Example: "update testTable set id = ? where name = ?"
+ * @param sqlStatement the SQL query
+ */
+ public void setSqlStatement(String sqlStatement)
+ {
+ this.sqlStatement = sqlStatement;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJONonInsertOutputOperator.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
index 4675cdb..dc695eb 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.lib.db.jdbc;
+import java.sql.Types;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -49,12 +50,12 @@ public class JdbcIOApp implements StreamingApplication
dag.getMeta(jdbcInputOperator).getMeta(jdbcInputOperator.outputPort).getAttributes()
.put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class);
- JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOOutputOperator());
+ JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
outputStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
outputStore.setDatabaseUrl("jdbc:hsqldb:mem:test");
jdbcOutputOperator.setStore(outputStore);
- jdbcOutputOperator.setFieldInfos(addFieldInfos());
+ jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos());
jdbcOutputOperator.setTablename("test_app_output_event_table");
jdbcOutputOperator.setBatchSize(10);
dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes()
@@ -72,4 +73,14 @@ public class JdbcIOApp implements StreamingApplication
fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
return fieldInfos;
}
+
+ private List<JdbcFieldInfo> addJdbcFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER, Types.INTEGER));
+ fieldInfos.add(new JdbcFieldInfo("NAME", "name", SupportType.STRING, Types.VARCHAR));
+ fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER, Types.INTEGER));
+ return fieldInfos;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 26196f5..e432ab3 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import javax.annotation.Nonnull;
@@ -44,6 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -165,10 +167,10 @@ public class JdbcOperatorTest
+ "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, PRIMARY KEY ( id ))";
stmt.executeUpdate(createPOJOTable);
String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF
- + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
+ + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
stmt.executeUpdate(createPOJOTableIdDiff);
String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF
- + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
+ + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
stmt.executeUpdate(createPOJOTableNameDiff);
} catch (Throwable e) {
DTThrowable.rethrow(e);
@@ -290,6 +292,46 @@ public class JdbcOperatorTest
}
+ private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator
+ {
+ public TestPOJONonInsertOutputOperator()
+ {
+ cleanTable();
+ }
+
+ public int getNumOfEventsInStore()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ public int getDistinctNonUnique()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(distinct(name)) from " + TABLE_POJO_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+ }
+
private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent>
{
@@ -369,9 +411,9 @@ public class JdbcOperatorTest
outputOperator.setBatchSize(3);
outputOperator.setTablename(TABLE_POJO_NAME);
- List<FieldInfo> fieldInfos = Lists.newArrayList();
- fieldInfos.add(new FieldInfo("ID", "id", null));
- fieldInfos.add(new FieldInfo("NAME", "name", null));
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER));
+ fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR));
outputOperator.setFieldInfos(fieldInfos);
outputOperator.setStore(transactionalStore);
@@ -401,6 +443,7 @@ public class JdbcOperatorTest
/**
* This test will assume direct mapping for POJO fields to DB columns
+ * All fields in DB present in POJO
*/
@Test
public void testJdbcPojoInsertOutputOperator()
@@ -428,12 +471,21 @@ public class JdbcOperatorTest
TestPortContext tpc = new TestPortContext(portAttributes);
outputOperator.input.setup(tpc);
+ CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
+ TestUtils.setSink(outputOperator.error, errorSink);
+
outputOperator.activate(context);
List<TestPOJOEvent> events = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
events.add(new TestPOJOEvent(i, "test" + i));
}
+ events.add(new TestPOJOEvent(0, "test0")); // Records violating PK constraint
+ events.add(new TestPOJOEvent(2, "test2")); // Records violating PK constraint
+ events.add(new TestPOJOEvent(10, "test10")); // Clean record
+ events.add(new TestPOJOEvent(11, "test11")); // Clean record
+ events.add(new TestPOJOEvent(3, "test3")); // Records violating PK constraint
+ events.add(new TestPOJOEvent(12, "test12")); // Clean record
outputOperator.beginWindow(0);
for (TestPOJOEvent event : events) {
@@ -441,11 +493,15 @@ public class JdbcOperatorTest
}
outputOperator.endWindow();
- Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+ Assert.assertEquals("rows in db", 13, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+ Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size());
}
/**
* This test will assume direct mapping for POJO fields to DB columns
+ * Nullable DB field missing in POJO
+ * name1 field, which is nullable in DB is missing from POJO
+ * POJO(id, name) -> DB(id, name1)
*/
@Test
public void testJdbcPojoInsertOutputOperatorNullName()
@@ -487,7 +543,108 @@ public class JdbcOperatorTest
outputOperator.endWindow();
Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
- Assert.assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+ Assert.assertEquals("null name rows in db", 10,
+ outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+ }
+
+ /**
+ * This test will assume direct mapping for POJO fields to DB columns.
+ * Non-Nullable DB field missing in POJO
+ * id1 field which is non-nullable in DB is missing from POJO
+ * POJO(id, name) -> DB(id1, name)
+ */
+ @Test
+ public void testJdbcPojoInsertOutputOperatorNullId()
+ {
+ JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+ outputOperator.setBatchSize(3);
+ outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF);
+
+ outputOperator.setStore(transactionalStore);
+
+ outputOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ outputOperator.input.setup(tpc);
+
+ boolean exceptionOccurred = false;
+ try {
+ outputOperator.activate(context);
+ } catch (Exception e) {
+ exceptionOccurred = true;
+ Assert.assertTrue(e instanceof RuntimeException);
+ Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo"));
+ }
+ Assert.assertTrue(exceptionOccurred);
+ }
+
+ @Test
+ public void testJdbcPojoOutputOperatorMerge()
+ {
+ JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJONonInsertOutputOperator updateOperator = new TestPOJONonInsertOutputOperator();
+ updateOperator.setBatchSize(3);
+
+ updateOperator.setStore(transactionalStore);
+
+ updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES (?, ?)) AS FOO(id, name) "
+ + "ON T.id = FOO.id "
+ + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
+ + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);");
+
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER));
+ fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR));
+ updateOperator.setFieldInfos(fieldInfos);
+ updateOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ updateOperator.input.setup(tpc);
+
+ updateOperator.activate(context);
+
+ List<TestPOJOEvent> events = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ events.add(new TestPOJOEvent(i, "test" + i));
+ }
+ for (int i = 0; i < 5; i++) {
+ events.add(new TestPOJOEvent(i, "test" + 100));
+ }
+
+ updateOperator.getDistinctNonUnique();
+ updateOperator.beginWindow(0);
+ for (TestPOJOEvent event : events) {
+ updateOperator.input.process(event);
+ }
+ updateOperator.endWindow();
+
+ // Expect 10 unique ids: 0 - 9
+ Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore());
+ // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9
+ Assert.assertEquals("rows in db", 6, updateOperator.getDistinctNonUnique());
}
@Test