You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/07/01 08:04:11 UTC

[3/3] apex-malhar git commit: APEXMALHAR-1953: Added JdbcPOJONonInsertOutputOperator for update / delete / merge queries. Added error port and error tuple handling to AbstractJdbcTransactionableOutputOperator, Added Autometrics, Added Unit tests and Refa

APEXMALHAR-1953: Added JdbcPOJONonInsertOutputOperator for update / delete / merge queries. Added error port and error tuple handling to AbstractJdbcTransactionableOutputOperator, Added Autometrics, Added Unit tests and Refactored accordingly


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ddd5bcf1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ddd5bcf1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ddd5bcf1

Branch: refs/heads/master
Commit: ddd5bcf1a80d22d0e2727868339757dcce3bb2b8
Parents: f54ba32
Author: bhupesh <bh...@gmail.com>
Authored: Thu Mar 17 18:24:08 2016 +0530
Committer: bhupesh <bh...@gmail.com>
Committed: Fri Jul 1 11:53:43 2016 +0530

----------------------------------------------------------------------
 .../db/jdbc/AbstractJdbcPOJOOutputOperator.java |  29 ++--
 ...stractJdbcTransactionableOutputOperator.java |  77 ++++++++-
 .../datatorrent/lib/db/jdbc/JdbcFieldInfo.java  |  58 +++++++
 .../db/jdbc/JdbcPOJOInsertOutputOperator.java   |  19 ++-
 .../jdbc/JdbcPOJONonInsertOutputOperator.java   |  76 +++++++++
 .../com/datatorrent/lib/db/jdbc/JdbcIOApp.java  |  15 +-
 .../lib/db/jdbc/JdbcOperatorTest.java           | 171 ++++++++++++++++++-
 7 files changed, 411 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index c310a40..45e0cbb 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -38,7 +38,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.Getter;
 import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
@@ -61,7 +60,7 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
 {
-  private List<FieldInfo> fieldInfos;
+  private List<JdbcFieldInfo> fieldInfos;
 
   protected List<Integer> columnDataTypes;
 
@@ -142,15 +141,15 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
           break;
 
         case Types.TIMESTAMP:
-          statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setTimestamp(i + 1, ((Getter<Object, Timestamp>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case Types.TIME:
-          statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setTime(i + 1, ((Getter<Object, Time>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         case Types.DATE:
-          statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)));
+          statement.setDate(i + 1, ((Getter<Object, Date>)activeFieldInfo.setterOrGetter).get(tuple));
           break;
 
         default:
@@ -169,7 +168,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
   /**
    * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
    */
-  public List<FieldInfo> getFieldInfos()
+  public List<JdbcFieldInfo> getFieldInfos()
   {
     return fieldInfos;
   }
@@ -182,7 +181,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
    * @description $[].pojoFieldExpression pojo field name or expression
    * @useSchema $[].pojoFieldExpression input.fields[].name
    */
-  public void setFieldInfos(List<FieldInfo> fieldInfos)
+  public void setFieldInfos(List<JdbcFieldInfo> fieldInfos)
   {
     this.fieldInfos = fieldInfos;
   }
@@ -195,6 +194,10 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
     return tablename;
   }
 
+  /**
+   * Set the target table name in database
+   * @param tablename
+   */
   public void setTablename(String tablename)
   {
     this.tablename = tablename;
@@ -259,18 +262,18 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac
           break;
 
         case Types.TIMESTAMP:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
-              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Timestamp.class);
           break;
 
         case Types.TIME:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
-              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Time.class);
           break;
 
         case Types.DATE:
-          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass,
-              activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass,
+              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Date.class);
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index d3300fc..0a7f3fd 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -18,8 +18,10 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
+import java.sql.BatchUpdateException;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 
 import javax.validation.constraints.Min;
@@ -28,10 +30,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
 
 /**
@@ -69,6 +73,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
   private transient int batchStartIdx;
   private transient PreparedStatement updateCommand;
 
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<T> error = new DefaultOutputPort<>();
+
+  @AutoMetric
+  private int tuplesWrittenSuccessfully;
+  @AutoMetric
+  private int errorTuples;
+
   public AbstractJdbcTransactionableOutputOperator()
   {
     tuples = Lists.newArrayList();
@@ -95,6 +107,14 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
   }
 
   @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    tuplesWrittenSuccessfully = 0;
+    errorTuples = 0;
+  }
+
+  @Override
   public void endWindow()
   {
     if (tuples.size() - batchStartIdx > 0) {
@@ -129,10 +149,47 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
       }
       updateCommand.executeBatch();
       updateCommand.clearBatch();
+      batchStartIdx += tuples.size() - batchStartIdx;
+    } catch (BatchUpdateException bue) {
+      logger.error(bue.getMessage());
+      processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx);
     } catch (SQLException e) {
       throw new RuntimeException("processing batch", e);
-    } finally {
-      batchStartIdx += tuples.size() - batchStartIdx;
+    }
+  }
+
+  /**
+   * Identify which commands in the batch failed and redirect these on the error port.
+   * See https://docs.oracle.com/javase/7/docs/api/java/sql/BatchUpdateException.html for more details
+   *
+   * @param updateCounts
+   * @param commandsInBatch
+   */
+  private void processUpdateCounts(int[] updateCounts, int commandsInBatch)
+  {
+    if (updateCounts.length < commandsInBatch) {
+      // Driver chose not to continue processing after failure.
+      error.emit(tuples.get(updateCounts.length + batchStartIdx));
+      errorTuples++;
+      // In this case, updateCounts is the number of successful queries
+      tuplesWrittenSuccessfully += updateCounts.length;
+      // Skip the error record
+      batchStartIdx += updateCounts.length + 1;
+      // And process the remaining if any
+      if ((tuples.size() - batchStartIdx) > 0) {
+        processBatch();
+      }
+    } else {
+      // Driver processed all batch statements in spite of failures.
+      // Pick out the failures and send on error port.
+      tuplesWrittenSuccessfully = commandsInBatch;
+      for (int i = 0; i < commandsInBatch; i++) {
+        if (updateCounts[i] == Statement.EXECUTE_FAILED) {
+          error.emit(tuples.get(i + batchStartIdx));
+          errorTuples++;
+          tuplesWrittenSuccessfully--;
+        }
+      }
     }
   }
 
@@ -163,6 +220,20 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
    */
   protected abstract void setStatementParameters(PreparedStatement statement, T tuple) throws SQLException;
 
+  public int getTuplesWrittenSuccessfully()
+  {
+    return tuplesWrittenSuccessfully;
+  }
+
+  /**
+   * Setter for metric tuplesWrittenSuccessfully
+   * @param tuplesWrittenSuccessfully
+   */
+  public void setTuplesWrittenSuccessfully(int tuplesWrittenSuccessfully)
+  {
+    this.tuplesWrittenSuccessfully = tuplesWrittenSuccessfully;
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
new file mode 100644
index 0000000..5c7e6e8
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcFieldInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * A {@link FieldInfo} object for Jdbc. <br/>
+ * Includes an SQL type for each field. <br/>
+ * An {@link FieldInfo} object used for JDBC output sources must have the SQL data types.
+ * This is needed to create correct getters and setters for the POJO,
+ * as well as setting the right parameter types in the JDBC prepared statement.
+ */
+public class JdbcFieldInfo extends FieldInfo
+{
+  private int sqlType;
+
+  public JdbcFieldInfo()
+  {
+  }
+
+  public JdbcFieldInfo(String columnName, String pojoFieldExpression, SupportType type, int sqlType)
+  {
+    super(columnName, pojoFieldExpression, type);
+
+    this.sqlType = sqlType;
+  }
+
+  public int getSqlType()
+  {
+    return sqlType;
+  }
+
+  /**
+   * Set the sql data type for this {@link JdbcFieldInfo}
+   * @param sqlType
+   */
+  public void setSqlType(int sqlType)
+  {
+    this.sqlType = sqlType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
index 09bab2f..67ec023 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
     // Populate columnNames and columnDataTypes
     try {
       if (getFieldInfos() == null) { // then assume direct mapping
-        LOG.info("Assuming direct mapping between POJO fields and DB columns");
+        LOG.info("FieldInfo missing. Assuming direct mapping between POJO fields and DB columns");
         populateColumnDataTypes(null);
       } else {
         // FieldInfo supplied by user
@@ -84,20 +85,20 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
   @Override
   public void activate(OperatorContext context)
   {
-    if(getFieldInfos() == null) {
+    if (getFieldInfos() == null) {
       Field[] fields = pojoClass.getDeclaredFields();
       // Create fieldInfos in case of direct mapping
-      List<FieldInfo> fieldInfos = Lists.newArrayList();
+      List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
       for (int i = 0; i < columnNames.size(); i++) {
         String columnName = columnNames.get(i);
         String pojoField = getMatchingField(fields, columnName);
 
-        if(columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
-                (pojoField == null || pojoField.length() == 0)) {
-          throw new RuntimeException("Data for a non-nullable field not found in POJO");
+        if (columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
+            (pojoField == null || pojoField.length() == 0)) {
+          throw new RuntimeException("Data for a non-nullable field: " + columnName + " not found in POJO");
         } else {
-          if(pojoField != null && pojoField.length() != 0) {
-            FieldInfo fi = new FieldInfo(columnName, pojoField, null);
+          if (pojoField != null && pojoField.length() != 0) {
+            JdbcFieldInfo fi = new JdbcFieldInfo(columnName, pojoField, null, Types.NULL);
             fieldInfos.add(fi);
           } else {
             columnDataTypes.remove(i);
@@ -138,7 +139,7 @@ public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
   private String getMatchingField(Field[] fields, String columnName)
   {
     for (Field f: fields) {
-      if(f.getName().equalsIgnoreCase(columnName)) {
+      if (f.getName().equalsIgnoreCase(columnName)) {
         return f.getName();
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
new file mode 100644
index 0000000..82ff043
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJONonInsertOutputOperator.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * <p>
+ * JdbcPOJOInsertOutputOperator class.</p>
+ * An implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JdbcPOJONonInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
+{
+  @NotNull
+  String sqlStatement;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+
+    columnDataTypes = Lists.newArrayList();
+    for (JdbcFieldInfo fi : getFieldInfos()) {
+      columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+      columnDataTypes.add(fi.getSqlType());
+    }
+  }
+
+  @Override
+  protected String getUpdateCommand()
+  {
+    return sqlStatement;
+  }
+
+  /**
+   * Sets the parameterized SQL query for the JDBC update operation.
+   * This can be an update, delete or a merge query.
+   * Example: "update testTable set id = ? where name = ?"
+   * @param sqlStatement the SQL query
+   */
+  public void setSqlStatement(String sqlStatement)
+  {
+    this.sqlStatement = sqlStatement;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJONonInsertOutputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
index 4675cdb..dc695eb 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOApp.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
+import java.sql.Types;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -49,12 +50,12 @@ public class JdbcIOApp implements StreamingApplication
     dag.getMeta(jdbcInputOperator).getMeta(jdbcInputOperator.outputPort).getAttributes()
         .put(Context.PortContext.TUPLE_CLASS, JdbcIOAppTest.PojoEvent.class);
 
-    JdbcPOJOOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOOutputOperator());
+    JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
     JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
     outputStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
     outputStore.setDatabaseUrl("jdbc:hsqldb:mem:test");
     jdbcOutputOperator.setStore(outputStore);
-    jdbcOutputOperator.setFieldInfos(addFieldInfos());
+    jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos());
     jdbcOutputOperator.setTablename("test_app_output_event_table");
     jdbcOutputOperator.setBatchSize(10);
     dag.getMeta(jdbcOutputOperator).getMeta(jdbcOutputOperator.input).getAttributes()
@@ -72,4 +73,14 @@ public class JdbcIOApp implements StreamingApplication
     fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
     return fieldInfos;
   }
+
+  private List<JdbcFieldInfo> addJdbcFieldInfos()
+  {
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER, Types.INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("NAME", "name", SupportType.STRING, Types.VARCHAR));
+    fieldInfos.add(new JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER, Types.INTEGER));
+    return fieldInfos;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ddd5bcf1/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 26196f5..e432ab3 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.List;
 
 import javax.annotation.Nonnull;
@@ -44,6 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.helper.TestPortContext;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -165,10 +167,10 @@ public class JdbcOperatorTest
           + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp TIMESTAMP, PRIMARY KEY ( id ))";
       stmt.executeUpdate(createPOJOTable);
       String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF
-              + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
+          + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
       stmt.executeUpdate(createPOJOTableIdDiff);
       String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF
-              + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
+          + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
       stmt.executeUpdate(createPOJOTableNameDiff);
     } catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -290,6 +292,46 @@ public class JdbcOperatorTest
 
   }
 
+  private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator
+  {
+    public TestPOJONonInsertOutputOperator()
+    {
+      cleanTable();
+    }
+
+    public int getNumOfEventsInStore()
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+    public int getDistinctNonUnique()
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(distinct(name)) from " + TABLE_POJO_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+  }
+
   private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent>
   {
 
@@ -369,9 +411,9 @@ public class JdbcOperatorTest
     outputOperator.setBatchSize(3);
     outputOperator.setTablename(TABLE_POJO_NAME);
 
-    List<FieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new FieldInfo("ID", "id", null));
-    fieldInfos.add(new FieldInfo("NAME", "name", null));
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("ID", "id", null, Types.INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("NAME", "name", null, Types.VARCHAR));
     outputOperator.setFieldInfos(fieldInfos);
 
     outputOperator.setStore(transactionalStore);
@@ -401,6 +443,7 @@ public class JdbcOperatorTest
 
   /**
    * This test will assume direct mapping for POJO fields to DB columns
+   * All fields in DB present in POJO
    */
   @Test
   public void testJdbcPojoInsertOutputOperator()
@@ -428,12 +471,21 @@ public class JdbcOperatorTest
     TestPortContext tpc = new TestPortContext(portAttributes);
     outputOperator.input.setup(tpc);
 
+    CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
+    TestUtils.setSink(outputOperator.error, errorSink);
+
     outputOperator.activate(context);
 
     List<TestPOJOEvent> events = Lists.newArrayList();
     for (int i = 0; i < 10; i++) {
       events.add(new TestPOJOEvent(i, "test" + i));
     }
+    events.add(new TestPOJOEvent(0, "test0")); // Records violating PK constraint
+    events.add(new TestPOJOEvent(2, "test2")); // Records violating PK constraint
+    events.add(new TestPOJOEvent(10, "test10")); // Clean record
+    events.add(new TestPOJOEvent(11, "test11")); // Clean record
+    events.add(new TestPOJOEvent(3, "test3")); // Records violating PK constraint
+    events.add(new TestPOJOEvent(12, "test12")); // Clean record
 
     outputOperator.beginWindow(0);
     for (TestPOJOEvent event : events) {
@@ -441,11 +493,15 @@ public class JdbcOperatorTest
     }
     outputOperator.endWindow();
 
-    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+    Assert.assertEquals("rows in db", 13, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+    Assert.assertEquals("Error tuples", 3, errorSink.collectedTuples.size());
   }
 
   /**
    * This test will assume direct mapping for POJO fields to DB columns
+   * Nullable DB field missing in POJO
+   * name1 field, which is nullable in DB is missing from POJO
+   * POJO(id, name) -> DB(id, name1)
    */
   @Test
   public void testJdbcPojoInsertOutputOperatorNullName()
@@ -487,7 +543,108 @@ public class JdbcOperatorTest
     outputOperator.endWindow();
 
     Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
-    Assert.assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+    Assert.assertEquals("null name rows in db", 10,
+        outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns.
+   * Non-Nullable DB field missing in POJO
+   * id1 field which is non-nullable in DB is missing from POJO
+   * POJO(id, name) -> DB(id1, name)
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperatorNullId()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME_ID_DIFF);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    boolean exceptionOccurred = false;
+    try {
+      outputOperator.activate(context);
+    } catch (Exception e) {
+      exceptionOccurred = true;
+      Assert.assertTrue(e instanceof RuntimeException);
+      Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo"));
+    }
+    Assert.assertTrue(exceptionOccurred);
+  }
+
+  @Test
+  public void testJdbcPojoOutputOperatorMerge()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJONonInsertOutputOperator updateOperator = new TestPOJONonInsertOutputOperator();
+    updateOperator.setBatchSize(3);
+
+    updateOperator.setStore(transactionalStore);
+
+    updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES (?, ?)) AS FOO(id, name) "
+        + "ON T.id = FOO.id "
+        + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
+        + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);");
+
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("id", "id", null, Types.INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("name", "name", null, Types.VARCHAR));
+    updateOperator.setFieldInfos(fieldInfos);
+    updateOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    updateOperator.input.setup(tpc);
+
+    updateOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+    for (int i = 0; i < 5; i++) {
+      events.add(new TestPOJOEvent(i, "test" + 100));
+    }
+
+    updateOperator.getDistinctNonUnique();
+    updateOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      updateOperator.input.process(event);
+    }
+    updateOperator.endWindow();
+
+    // Expect 10 unique ids: 0 - 9
+    Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore());
+    // Expect 6 unique name: test-100, test-5, test-6, test-7, test-8, test-9
+    Assert.assertEquals("rows in db", 6, updateOperator.getDistinctNonUnique());
   }
 
   @Test