You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/03 21:22:03 UTC

apex-malhar git commit: APEXMALHAR-2278.KuduNonTransactionalOutputOperator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 2fe2903bf -> 10dd94ef5


APEXMALHAR-2278.KuduNonTransactionalOutputOperator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/10dd94ef
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/10dd94ef
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/10dd94ef

Branch: refs/heads/master
Commit: 10dd94ef56e81a795a9a7295e74b686ffd79b255
Parents: 2fe2903
Author: Ananth <an...@gmail.com>
Authored: Thu May 4 06:19:05 2017 +1000
Committer: Ananth <an...@gmail.com>
Committed: Thu May 4 06:19:05 2017 +1000

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../kudu/AbstractKuduOutputOperator.java        | 660 +++++++++++++++++++
 .../malhar/contrib/kudu/ApexKuduConnection.java | 223 +++++++
 .../contrib/kudu/BaseKuduOutputOperator.java    | 158 +++++
 .../contrib/kudu/KuduExecutionContext.java      | 109 +++
 .../malhar/contrib/kudu/KuduMutationType.java   |  32 +
 ...uduCreateUpdateDeleteOutputOperatorTest.java | 336 ++++++++++
 .../contrib/kudu/SimpleKuduOutputOperator.java  |  52 ++
 .../malhar/contrib/kudu/UnitTestTablePojo.java  | 125 ++++
 .../resources/kuduoutputoperator.properties     |  22 +
 10 files changed, 1723 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 893ec2d..83305cb 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -678,5 +678,11 @@
       <artifactId>jackson-databind</artifactId>
       <version>2.7.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>1.3.0</version>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
new file mode 100644
index 0000000..250334b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
@@ -0,0 +1,660 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Statistics;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An Abstract operator that would allow concrete implementations to write a POJO value into a given Kudu table.
+ * <p>
+ * To use this operator, the following needs to be done by the implementor
+ * <ol>
+ * <li>Create a concrete implementation of this operator and implement the method to define the connection
+ * properties to Kudu. The connection properties are set using the {@link ApexKuduConnection} using a builder pattern.
+ * </li>
+ * <li>Implement the logic how tuples need to ne handled in the event of a reconciliation phase ( i.e. when an
+ * operator is resuming back from failure ). See javadoc of the method for more details.</li>
+ * <li>Define the payload class</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Note that the tuple that is getting processed is not the POJO class. The tuple that is getting processed is the
+ * {@link KuduExecutionContext} instance. This is because the operator supports mutation types as a higher level
+ * construct than simply writing a POJO into a Kudu table row.
+ * </p>
+ * <p>
+ * Supported mutations are:
+ * <ol>
+ *      <li>INSERT</li>
+ *      <li>UPDATE</li>
+ *      <li>UPSERT</li>
+ *      <li>DELETE</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Please note that the Update mutation allows to change a subset of columns only even though there might be columns
+ * that were defined to be non-nullable. This is because the original mutation of type insert would have written the
+ * mandatory columns. In such scenarios, the method setDoNotWriteColumns() in {@link KuduExecutionContext} can be
+ * used to specify only those columns that need an update. This ways a read and Update pattern can be merged to a
+ * simple update pattern thus avoiding a read if required.</p>
+ * */
+@InterfaceStability.Evolving
+public abstract class AbstractKuduOutputOperator extends BaseOperator
+    implements Operator.ActivationListener<Context.OperatorContext>,Operator.CheckpointNotificationListener
+{
+
+  private transient ApexKuduConnection apexKuduConnection;
+
+  private transient  KuduTable kuduTable;
+
+  private transient  KuduSession kuduSession;
+
+  private transient KuduClient kuduClientHandle;
+
+  private transient Map<String,ColumnSchema> allColumnDefs;
+
+  private transient Map<String,Object> kuduColumnBasedGetters;
+
+  private Set<String> primaryKeyColumnNames;
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
+
+  @NotNull
+  private WindowDataManager windowDataManager;
+
+  private transient long currentWindowId;
+
+  private transient boolean isInReplayMode;
+
+  private transient boolean isInReconcilingMode;
+
+  private transient long reconcilingWindowId;
+
+  @AutoMetric
+  transient long numInserts;
+
+  @AutoMetric
+  transient long numUpserts;
+
+  @AutoMetric
+  transient long numDeletes;
+
+  @AutoMetric
+  transient long numUpdates;
+
+  @AutoMetric
+  transient long numOpsErrors;
+
+  @AutoMetric
+  transient long numBytesWritten;
+
+  @AutoMetric
+  transient long numRpcErrors;
+
+  @AutoMetric
+  transient long numWriteOps;
+
+  @AutoMetric
+  transient long numWriteRPCs;
+
+  @AutoMetric
+  long totalOpsErrors = 0;
+
+  @AutoMetric
+  long totalBytesWritten = 0;
+
+  @AutoMetric
+  long totalRpcErrors = 0;
+
+  @AutoMetric
+  long totalWriteOps = 0;
+
+  @AutoMetric
+  long totalWriteRPCs = 0;
+
+  @AutoMetric
+  long totalInsertsSinceStart;
+
+  @AutoMetric
+  long totalUpsertsSinceStart;
+
+  @AutoMetric
+  long totalDeletesSinceStart;
+
+  @AutoMetric
+  long totalUpdatesSinceStart;
+
+  public final transient DefaultInputPort<KuduExecutionContext> input = new DefaultInputPort<KuduExecutionContext>()
+  {
+    @Override
+    public void process(KuduExecutionContext kuduExecutionContext)
+    {
+      processTuple(kuduExecutionContext);
+    }
+  }; // end input port implementation
+
+  public void processTuple(KuduExecutionContext kuduExecutionContext)
+  {
+    if ( isInReconcilingMode || isInReplayMode) {
+      if ( !isEligibleForPassivationInReconcilingWindow(kuduExecutionContext, currentWindowId)) {
+        return;
+      }
+    }
+    KuduMutationType mutationType = kuduExecutionContext.getMutationType();
+    switch (mutationType) {
+      case DELETE:
+        processForDelete(kuduExecutionContext);
+        numDeletes += 1;
+        totalDeletesSinceStart += 1;
+        break;
+      case INSERT:
+        processForInsert(kuduExecutionContext);
+        numInserts += 1;
+        totalInsertsSinceStart += 1;
+        break;
+      case UPDATE:
+        processForUpdate(kuduExecutionContext);
+        numUpdates += 1;
+        totalUpdatesSinceStart += 1;
+        break;
+      case UPSERT:
+        processForUpsert(kuduExecutionContext);
+        numUpserts += 1;
+        totalUpsertsSinceStart += 1;
+        break;
+      default:
+        break;
+    }
+  }
+
+  /***
+   * Sets the values from the Pojo into the Kudu mutation object.
+   * @param currentOperation The operation instance that represents the current mutation. This will be applied to the
+   *                         current session
+   * @param kuduExecutionContext The tuple that contains the payload as well as other information like mutation type etc
+   */
+  @SuppressWarnings(value = "unchecked")
+  private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext)
+  {
+    currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
+    currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp());
+    PartialRow partialRow = currentOperation.getRow();
+    Object payload = kuduExecutionContext.getPayload();
+    Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns();
+    if (doNotWriteColumns == null) {
+      doNotWriteColumns = new HashSet<>();
+    }
+    for (String columnName: kuduColumnBasedGetters.keySet()) {
+      if ( doNotWriteColumns.contains(columnName)) {
+        continue;
+      }
+      ColumnSchema columnSchema = allColumnDefs.get(columnName);
+      Type dataType = columnSchema.getType();
+      try {
+        switch (dataType) {
+          case STRING:
+            PojoUtils.Getter<Object, String> stringGetter = ((PojoUtils.Getter<Object, String>)kuduColumnBasedGetters
+                .get(columnName));
+            if (stringGetter != null) {
+              final String stringValue = stringGetter.get(payload);
+              if (stringValue != null) {
+                partialRow.addString(columnName, stringValue);
+              }
+            }
+            break;
+          case BINARY:
+            PojoUtils.Getter<Object, ByteBuffer> byteBufferGetter = ((PojoUtils.Getter<Object, ByteBuffer>)
+                kuduColumnBasedGetters.get(columnName));
+            if (byteBufferGetter != null) {
+              final ByteBuffer byteBufferValue = byteBufferGetter.get(payload);
+              if (byteBufferValue != null) {
+                partialRow.addBinary(columnName, byteBufferValue);
+              }
+            }
+            break;
+          case BOOL:
+            PojoUtils.GetterBoolean<Object> boolGetter = ((PojoUtils.GetterBoolean<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (boolGetter != null) {
+              final boolean boolValue = boolGetter.get(payload);
+              partialRow.addBoolean(columnName, boolValue);
+            }
+            break;
+          case DOUBLE:
+            PojoUtils.GetterDouble<Object> doubleGetter = ((PojoUtils.GetterDouble<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (doubleGetter != null) {
+              final double doubleValue = doubleGetter.get(payload);
+              partialRow.addDouble(columnName, doubleValue);
+            }
+            break;
+          case FLOAT:
+            PojoUtils.GetterFloat<Object> floatGetter = ((PojoUtils.GetterFloat<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (floatGetter != null) {
+              final float floatValue = floatGetter.get(payload);
+              partialRow.addFloat(columnName, floatValue);
+            }
+            break;
+          case INT8:
+            PojoUtils.GetterByte<Object> byteGetter = ((PojoUtils.GetterByte<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (byteGetter != null) {
+              final byte byteValue = byteGetter.get(payload);
+              partialRow.addByte(columnName, byteValue);
+            }
+            break;
+          case INT16:
+            PojoUtils.GetterShort<Object> shortGetter = ((PojoUtils.GetterShort<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (shortGetter != null) {
+              final short shortValue = shortGetter.get(payload);
+              partialRow.addShort(columnName, shortValue);
+            }
+            break;
+          case INT32:
+            PojoUtils.GetterInt<Object> intGetter = ((PojoUtils.GetterInt<Object>)
+                kuduColumnBasedGetters.get(columnName));
+            if (intGetter != null) {
+              final int intValue = intGetter.get(payload);
+              partialRow.addInt(columnName, intValue);
+            }
+            break;
+          case INT64:
+          case UNIXTIME_MICROS:
+            PojoUtils.GetterLong<Object> longGetter = ((PojoUtils.GetterLong<Object>)kuduColumnBasedGetters.get(
+                columnName));
+            if (longGetter != null) {
+              final long longValue = longGetter.get(payload);
+              partialRow.addLong(columnName, longValue);
+            }
+            break;
+          default:
+            LOG.error(columnName + " is not of the supported data type");
+            throw new UnsupportedOperationException("Kudu does not support data type for column " + columnName);
+        }
+      } catch ( Exception ex ) {
+        LOG.error(" Exception while fetching the value of " + columnName + " because " + ex.getMessage());
+        partialRow.setNull(columnName);
+      }
+    }
+    try {
+      kuduSession.apply(currentOperation);
+    } catch (KuduException e) {
+      LOG.error("Could not execute operation because " + e.getMessage(), e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  protected void processForUpdate(KuduExecutionContext kuduExecutionContext)
+  {
+    Update thisUpdate = kuduTable.newUpdate();
+    performCommonProcessing(thisUpdate,kuduExecutionContext);
+  }
+
+
+  protected void processForUpsert(KuduExecutionContext kuduExecutionContext)
+  {
+    Upsert thisUpsert = kuduTable.newUpsert();
+    performCommonProcessing(thisUpsert,kuduExecutionContext);
+  }
+
+
+
+  protected void processForDelete(KuduExecutionContext kuduExecutionContext)
+  {
+    Delete thisDelete = kuduTable.newDelete();
+    // Kudu does not allow column values to be set in case of a delete mutation
+    Set<String> doNotWriteCols = kuduExecutionContext.getDoNotWriteColumns();
+    if ( doNotWriteCols == null) {
+      doNotWriteCols = new HashSet<>();
+    }
+    doNotWriteCols.clear();
+    for (String columnName : allColumnDefs.keySet()) {
+      if ( !(primaryKeyColumnNames.contains(columnName))) {
+        doNotWriteCols.add(columnName);
+      }
+    }
+    kuduExecutionContext.setDoNotWriteColumns(doNotWriteCols);
+    performCommonProcessing(thisDelete,kuduExecutionContext);
+  }
+
+
+  protected void processForInsert(KuduExecutionContext kuduExecutionContext)
+  {
+    Insert thisInsert = kuduTable.newInsert();
+    performCommonProcessing(thisInsert,kuduExecutionContext);
+  }
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder = getKuduConnectionConfig();
+    apexKuduConnection = apexKuduConnectionBuilder.build();
+    checkNotNull(apexKuduConnection,"Kudu connection cannot be null");
+    kuduTable = apexKuduConnection.getKuduTable();
+    kuduSession = apexKuduConnection.getKuduSession();
+    kuduClientHandle = apexKuduConnection.getKuduClient();
+    checkNotNull(kuduTable,"Kudu Table cannot be null");
+    checkNotNull(kuduSession, "Kudu session cannot be null");
+    allColumnDefs = new HashMap();
+    primaryKeyColumnNames = new HashSet<>();
+    kuduColumnBasedGetters = new HashMap();
+    buildGettersForPojoPayload();
+    reconcilingWindowId = Stateless.WINDOW_ID;
+    // The operator is working in a replay mode where the upstream buffer is re-streaming the tuples
+    // Note that there are two windows that need special core. The window that is being replayed and the subsequent
+    // window that might have resulted in a crash which we are referring as reconciling window
+    if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+        context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+        windowDataManager.getLargestCompletedWindow()) {
+      reconcilingWindowId = windowDataManager.getLargestCompletedWindow() + 1;
+    }
+
+    if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+        context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) ==
+        windowDataManager.getLargestCompletedWindow()) {
+      reconcilingWindowId = windowDataManager.getLargestCompletedWindow();
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+    try {
+      apexKuduConnection.close();
+    } catch (Exception e) {
+      LOG.error("Could not close kudu session and resources because " + e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long l)
+  {
+    // Nothing to be done here. Child classes can use this if required
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+    // Nothing to be done here. Child classes can use this if required
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.committed(windowId);
+    } catch (IOException e) {
+      LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    windowDataManager = getWindowDataManager();
+    if ( windowDataManager == null) {
+      windowDataManager = new FSWindowDataManager();
+    }
+    windowDataManager.setup(context);
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowDataManager.teardown();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    currentWindowId = windowId;
+    if ( currentWindowId != Stateless.WINDOW_ID) { // if it is not the first window of the application
+      if (currentWindowId > reconcilingWindowId) {
+        isInReplayMode = false;
+        isInReconcilingMode = false;
+      }
+      if (currentWindowId == reconcilingWindowId) {
+        isInReconcilingMode = true;
+        isInReplayMode = false;
+      }
+      if (currentWindowId < reconcilingWindowId) {
+        isInReconcilingMode = false;
+        isInReplayMode = true;
+      }
+    }
+    numDeletes = 0;
+    numInserts = 0;
+    numUpdates = 0;
+    numUpserts = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    try {
+      kuduSession.flush();
+    } catch (KuduException e) {
+      LOG.error("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
+      throw new RuntimeException(e);
+    }
+    if ( currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+      try {
+        windowDataManager.save(currentWindowId,currentWindowId);
+      } catch (IOException e) {
+        LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage());
+        throw new RuntimeException(e);
+      }
+    }
+    numOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) -
+      totalOpsErrors;
+    numBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN) -
+      totalBytesWritten;
+    numRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS) -
+      totalRpcErrors;
+    numWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS) -
+      totalWriteOps;
+    numWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS) - totalWriteOps;
+    totalOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS);
+    totalBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN);
+    totalRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS);
+    totalWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS);
+    totalWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS);
+  }
+
+  private void buildGettersForPojoPayload()
+  {
+    Class payloadClass = getTuplePayloadClass();
+    checkNotNull(payloadClass,"Payload class cannot be null");
+    Field[] classFields = payloadClass.getDeclaredFields();
+    Schema schemaInfo = kuduTable.getSchema();
+    List<ColumnSchema> allColumns = schemaInfo.getColumns();
+    Set<String> allKuduTableColumnNames = new HashSet<>();
+    Map<String,ColumnSchema> normalizedColumns = new HashMap();
+    for ( ColumnSchema aColumnDef : allColumns) {
+      allColumnDefs.put(aColumnDef.getName(), aColumnDef);
+      normalizedColumns.put(aColumnDef.getName().toLowerCase(), aColumnDef);
+      allKuduTableColumnNames.add(aColumnDef.getName().toLowerCase());
+    }
+    List<ColumnSchema> primaryKeyColumns = schemaInfo.getPrimaryKeyColumns();
+    for (ColumnSchema primaryKeyInfo : primaryKeyColumns) {
+      primaryKeyColumnNames.add(primaryKeyInfo.getName());
+    }
+    Map<String,String> columnNameOverrides = getOverridingColumnNameMap();
+    if (columnNameOverrides == null) {
+      columnNameOverrides = new HashMap(); // to avoid null checks further down the line
+    }
+    for ( Field aFieldDef : classFields) {
+      String currentFieldName = aFieldDef.getName().toLowerCase();
+      if (allKuduTableColumnNames.contains(currentFieldName)) {
+        extractGetterForColumn(normalizedColumns.get(currentFieldName),aFieldDef);
+      } else {
+        if (columnNameOverrides.containsKey(aFieldDef.getName())) {
+          extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName()).toLowerCase()),
+              aFieldDef);
+        } else if (columnNameOverrides.containsKey(aFieldDef.getName().toLowerCase())) {
+          extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName().toLowerCase())
+              .toLowerCase()),aFieldDef);
+        }
+      }
+    }
+  }
+
+  /***
+   * Used to build a getter for the given schema column from the POJO field definition
+   * @param columnSchema The Kudu column definition
+   * @param fieldDefinition The POJO field definition
+   */
+  private void extractGetterForColumn(ColumnSchema columnSchema, Field fieldDefinition)
+  {
+    Type columnType = columnSchema.getType();
+    Class pojoClass = getTuplePayloadClass();
+    Object getter = null;
+    switch ( columnType ) {
+      case BINARY:
+        getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), ByteBuffer.class);
+        break;
+      case STRING:
+        getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), String.class);
+        break;
+      case BOOL:
+        getter = PojoUtils.createGetterBoolean(pojoClass, fieldDefinition.getName());
+        break;
+      case DOUBLE:
+        getter = PojoUtils.createGetterDouble(pojoClass, fieldDefinition.getName());
+        break;
+      case FLOAT:
+        getter = PojoUtils.createGetterFloat(pojoClass, fieldDefinition.getName());
+        break;
+      case INT8:
+        getter = PojoUtils.createGetterByte(pojoClass, fieldDefinition.getName());
+        break;
+      case INT16:
+        getter = PojoUtils.createGetterShort(pojoClass, fieldDefinition.getName());
+        break;
+      case INT32:
+        getter = PojoUtils.createGetterInt(pojoClass, fieldDefinition.getName());
+        break;
+      case INT64:
+      case UNIXTIME_MICROS:
+        getter = PojoUtils.createGetterLong(pojoClass, fieldDefinition.getName());
+        break;
+      default:
+        LOG.error(fieldDefinition.getName() + " has a data type that is not yet supported");
+        throw new UnsupportedOperationException(fieldDefinition.getName() + " does not have a compatible data type");
+    }
+    if (getter != null) {
+      kuduColumnBasedGetters.put(columnSchema.getName(),getter);
+    }
+  }
+
+
+  public static ApexKuduConnection.ApexKuduConnectionBuilder usingKuduConnectionBuilder()
+  {
+    return new ApexKuduConnection.ApexKuduConnectionBuilder();
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  /***
+   * Allows to map a POJO field name to a Kudu Table column name. This is useful in case
+   * the POJO field name can't be changed to an unconventional name ( ex: if kudu column names have underscores ). It
+   * can be also useful when the developer does not want to declare a new POJO but reuse an existing POJO.
+   * Note that the key in the map is the name of the field in the POJO and
+   * the value part is used to denote the name of the kudu column
+   * @return The map giving the mapping from POJO field name to the Kudu column name
+   */
+  protected Map<String,String> getOverridingColumnNameMap()
+  {
+    return new HashMap<>();
+  }
+
+  abstract ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig();
+
+  /***
+   * Represents the Tuple payload class that maps to a Kudu table row. Note that the POJO fields are mapped to the
+   * kudu column names is a lenient way. For example, the mapping of POJO field names to the kudu columns is done
+   * in a case-insensitive way.
+   * @return The class that will be used to map to a row in the given Kudu table.
+   */
+  protected abstract Class getTuplePayloadClass();
+
+  /***
+   * This is used to give control to the concrete implementation of this operator how to resolve whether to write a
+   * mutation into a given kudu table in the event of a failure and the operator subsequently resumes. This window
+   * is marked as a reconciling window. It is only for this reconciling window that we need to give control to the
+   * concrete operator implementor how to actually resolve if the entry needs to be excuted as a mutation in Kudu.
+   * @param executionContext The tuple which represents the execution context along with the payload
+   * @param reconcilingWindowId The window Id of the reconciling window
+   * @return true if we would like the entry to result in a mutation in the Kudu table.
+   */
+  protected abstract boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+      long reconcilingWindowId);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
new file mode 100644
index 0000000..aed6b8b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * <p>Represents a connection to the Kudu cluster. An instance of this class is to be supplied (via a builder pattern to)
+ * {@link AbstractKuduOutputOperator} to connect to a Kudu cluster.</p>
+ */
+
+public class ApexKuduConnection implements AutoCloseable, Serializable
+{
+  private static final long serialVersionUID = 4720185362997969198L;
+
+  private transient  KuduSession kuduSession;
+
+  private transient  KuduTable kuduTable;
+
+  private transient  KuduClient kuduClient;
+
+  public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class);
+
+
+  private ApexKuduConnection(ApexKuduConnectionBuilder builder)
+  {
+    checkNotNull(builder,"Builder cannot be null to establish kudu session");
+    checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified");
+    checkNotNull(builder.tableName,"Kudu table cannot be null");
+    KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection);
+    if (builder.isOperationTimeOutSet) {
+      kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs);
+    }
+    if (builder.isBossThreadCountSet) {
+      kuduClientBuilder.bossCount(builder.numBossThreads);
+    }
+    if (builder.isWorkerThreadsCountSet) {
+      kuduClientBuilder.workerCount(builder.workerThreads);
+    }
+    if (builder.isSocketReadTimeOutSet) {
+      kuduClientBuilder.defaultSocketReadTimeoutMs(builder.socketReadTimeOutMs);
+    }
+    kuduClient = kuduClientBuilder.build();
+    kuduSession = kuduClient.newSession();
+    if (builder.isFlushModeSet) {
+      kuduSession.setFlushMode(builder.flushMode);
+    }
+    if (builder.isExternalConsistencyModeSet) {
+      kuduSession.setExternalConsistencyMode(builder.externalConsistencyMode);
+    }
+    try {
+      if (!kuduClient.tableExists(builder.tableName)) {
+        throw new Exception("Table " + builder.tableName + " does not exist. ");
+      } else {
+        kuduTable = kuduClient.openTable(builder.tableName);
+      }
+    } catch (Exception e) {
+      LOG.error("Kudu table existence could not be ascertained  " + e.getMessage());
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void close() throws Exception
+  {
+    kuduSession.close();
+    kuduClient.close();
+  }
+
+  public KuduSession getKuduSession()
+  {
+    return kuduSession;
+  }
+
+  public void setKuduSession(KuduSession kuduSession)
+  {
+    this.kuduSession = kuduSession;
+  }
+
+  public KuduTable getKuduTable()
+  {
+    return kuduTable;
+  }
+
+  public void setKuduTable(KuduTable kuduTable)
+  {
+    this.kuduTable = kuduTable;
+  }
+
+  public KuduClient getKuduClient()
+  {
+    return kuduClient;
+  }
+
+  public void setKuduClient(KuduClient kuduClient)
+  {
+    this.kuduClient = kuduClient;
+  }
+
+  public static class ApexKuduConnectionBuilder
+  {
+    List<String> mastersCollection = new ArrayList<>();
+
+    String tableName;
+
+    // optional props
+    int numBossThreads = 1;
+
+    boolean isBossThreadCountSet = false;
+
+    int workerThreads = 2 *  Runtime.getRuntime().availableProcessors();
+
+    boolean isWorkerThreadsCountSet = false;
+
+    long socketReadTimeOutMs = 10000;
+
+    boolean isSocketReadTimeOutSet = false;
+
+    long operationTimeOutMs = 30000;
+
+    boolean isOperationTimeOutSet = false;
+
+    ExternalConsistencyMode externalConsistencyMode;
+
+    boolean isExternalConsistencyModeSet = false;
+
+    SessionConfiguration.FlushMode flushMode;
+
+    boolean isFlushModeSet = false;
+
+    public ApexKuduConnectionBuilder withTableName(String tableName)
+    {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withAPossibleMasterHostAs(String masterHostAndPort)
+    {
+      mastersCollection.add(masterHostAndPort);
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withNumberOfBossThreads(int numberOfBossThreads)
+    {
+      this.numBossThreads = numberOfBossThreads;
+      isBossThreadCountSet = true;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withNumberOfWorkerThreads(int numberOfWorkerThreads)
+    {
+      this.workerThreads = numberOfWorkerThreads;
+      isWorkerThreadsCountSet = true;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withSocketReadTimeOutAs(long socketReadTimeOut)
+    {
+      this.socketReadTimeOutMs = socketReadTimeOut;
+      isSocketReadTimeOutSet = true;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withOperationTimeOutAs(long operationTimeOut)
+    {
+      this.operationTimeOutMs = operationTimeOut;
+      isOperationTimeOutSet = true;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode)
+    {
+      this.externalConsistencyMode = externalConsistencyMode;
+      isExternalConsistencyModeSet = true;
+      return this;
+    }
+
+    public ApexKuduConnectionBuilder withFlushMode(SessionConfiguration.FlushMode flushMode)
+    {
+      this.flushMode = flushMode;
+      isFlushModeSet = true;
+      return this;
+    }
+
+    protected ApexKuduConnection build()
+    {
+      ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
+      return apexKuduConnection;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
new file mode 100644
index 0000000..6de7190
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Provides a default implementation for writing tuples as Kudu rows.
+ * The user will have to either provide for
+ * <ol>
+ * <li>a property file containing properties like Kudu master list, Kudu table name and other connection properties.
+ * The operator will fail to launch if the properties file named kuduoutputoperator.properties is not locatable in the
+ *  root path</li>
+ * <li>Use the default constructor which supports minimum required properties as parameters</li>
+ * <li>In case of presence of multiple Kudu output operators in the same application, use the String based
+ * constructor which accepts a file name for each of the kudu table that the incoming pojo needs to be passivated
+ *  to</li>
+ * </ol>
+ * <p>
+ * The properties file will have to consist of the following keys:
+ * <br>masterhosts=<ip1:host>,<ip2:host>,..# Comma separated</br>
+ * <br>tablename=akudutablename</br>
+ * <br>pojoclassname=somepojoclasswithgettersandsetters; # Do not append name with .class at the end and
+ *  do not forget to give the complete class name including the package</br>
+ * </p>
+ */
+public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
+{
+  public static final String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
+
+  public static final String TABLE_NAME = "tablename";
+
+  public static final String MASTER_HOSTS = "masterhosts";
+
+  public static final String POJO_CLASS_NAME = "pojoclassname";
+
+  private Class pojoPayloadClass;
+
+  private ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder;
+
+  public BaseKuduOutputOperator() throws IOException, ClassNotFoundException
+  {
+    initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
+  }
+
+  public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException
+  {
+    initConnectionBuilderProperties(configFileInClasspath);
+  }
+
+  private void initConnectionBuilderProperties(String configFileInClasspath) throws IOException, ClassNotFoundException
+  {
+    Properties kuduConnectionProperties = new Properties();
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
+    if (kuduPropsFileAsStream != null) {
+      kuduConnectionProperties.load(kuduPropsFileAsStream);
+    } else {
+      throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
+      " is not locatable in the root classpath");
+    }
+    String tableName = checkNotNull(kuduConnectionProperties.getProperty(TABLE_NAME));
+    String pojoClassName = checkNotNull(kuduConnectionProperties.getProperty(POJO_CLASS_NAME));
+    String masterHostsConnectionString = checkNotNull(kuduConnectionProperties.getProperty(MASTER_HOSTS));
+    String[] masterAndHosts = masterHostsConnectionString.split(",");
+    pojoPayloadClass = Class.forName(pojoClassName);
+    initKuduConfig(tableName, Arrays.asList(masterAndHosts));
+  }
+
+  private void initKuduConfig(String kuduTableName, List<String> kuduMasters)
+  {
+    apexKuduConnectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder()
+      .withTableName(kuduTableName)
+      .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+      .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+      .withNumberOfBossThreads(1)
+      .withNumberOfWorkerThreads(2)
+      .withSocketReadTimeOutAs(3000)
+      .withOperationTimeOutAs(3000);
+    for ( String aMasterAndHost: kuduMasters ) {
+      apexKuduConnectionBuilder = apexKuduConnectionBuilder.withAPossibleMasterHostAs(aMasterAndHost);
+    }
+  }
+
+
+  public BaseKuduOutputOperator(String kuduTableName,List<String> kuduMasters, Class pojoPayloadClass)
+  {
+    this.pojoPayloadClass = pojoPayloadClass;
+    initKuduConfig(kuduTableName,kuduMasters);
+  }
+
+  @Override
+  ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
+  {
+    return apexKuduConnectionBuilder;
+  }
+
+  /**
+   * Can be used to further fine tune any of the connection configs once the constructor completes instantiating the
+   * Kudu Connection config builder.
+   * @return The Connection Config that would be used to initiate a connection to the Kudu Cluster once the operator is
+   * deserialized in the node manager managed container. See {@link AbstractKuduOutputOperator} activate() method for
+   * more details.
+   */
+  public ApexKuduConnection.ApexKuduConnectionBuilder getApexKuduConnectionBuilder()
+  {
+    return apexKuduConnectionBuilder;
+  }
+
+
+  /**
+   *
+   * @return The pojo class that would be streamed in the KuduExecutionContext
+   */
+  @Override
+  protected Class getTuplePayloadClass()
+  {
+    return pojoPayloadClass;
+  }
+
+  /**
+   * The default is to implement for ATLEAST_ONCE semantics. Override this control this behavior.
+   * @param executionContext The tuple which represents the execution context along with the payload
+   * @param reconcilingWindowId The window Id of the reconciling window
+   * @return
+   */
+  @Override
+  protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+      long reconcilingWindowId)
+  {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
new file mode 100644
index 0000000..27d382d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+
+/**
+ * <p>Represents a summary of the mutation that needs to be done on the Kudu table. The type of mutation is
+ * decided by the KuduMutation Type field. The actual data that is mutated inside the kudu table row is
+ * represented by the payload. The execution context itself a templated class based on the payload class.</p>
+ */
+public class KuduExecutionContext<T>
+{
+  private T payload;
+  /***
+   * <p>Represents the set of columns that are not to be written to a Kudu row. Note that this is useful when we
+   * would like to not write a set of columns into the table either because
+   * <ol>
+   * <li>We are doing an update and we would like to update only a few of the columns as the original columns were
+   * already written in the original insert mutation</li>
+   * <li>When we would like to not write a column because the column is an optional column as per the schema definition
+   * </li>
+   * </ol>
+   * It may be noted that the client driver will throw an exception when a mandatory column is not written.</p>
+   */
+  private Set<String> doNotWriteColumns = new HashSet<>();
+
+  private KuduMutationType mutationType = KuduMutationType.UPSERT;
+
+  private ExternalConsistencyMode externalConsistencyMode;
+
+  private long propagatedTimestamp;
+
+  public T getPayload()
+  {
+    return payload;
+  }
+
+  public void setPayload(T payload)
+  {
+    this.payload = payload;
+  }
+
+  public KuduMutationType getMutationType()
+  {
+    return mutationType;
+  }
+
+  public void setMutationType(KuduMutationType mutationType)
+  {
+    this.mutationType = mutationType;
+  }
+
+  public ExternalConsistencyMode getExternalConsistencyMode()
+  {
+    return externalConsistencyMode;
+  }
+
+  public void setExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode)
+  {
+    this.externalConsistencyMode = externalConsistencyMode;
+  }
+
+  public long getPropagatedTimestamp()
+  {
+    return propagatedTimestamp;
+  }
+
+  public void setPropagatedTimestamp(long propagatedTimestamp)
+  {
+    this.propagatedTimestamp = propagatedTimestamp;
+  }
+
+  public Set<String> getDoNotWriteColumns()
+  {
+    return doNotWriteColumns;
+  }
+
+  public void setDoNotWriteColumns(Set<String> doNotWriteColumns)
+  {
+    this.doNotWriteColumns = doNotWriteColumns;
+  }
+
+  public void addDoNotWriteColumn(String aKuduColumnName)
+  {
+    doNotWriteColumns.add(aKuduColumnName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
new file mode 100644
index 0000000..64b46c6
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+/**
+ * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
+ * mutation being represented by the current tuple</p>
+ */
+public enum KuduMutationType
+{
+
+    INSERT,
+    DELETE,
+    UPDATE,
+    UPSERT
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
new file mode 100644
index 0000000..615427a
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.helper.TestPortContext;
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.junit.Assert.assertEquals;
+
+
+public class KuduCreateUpdateDeleteOutputOperatorTest
+{
+
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(KuduCreateUpdateDeleteOutputOperatorTest.class);
+
+  private static final String tableName = "unittests";
+
+  private final String APP_ID = "TestKuduOutputOperator";
+
+  private final int OPERATOR_ID_FOR_KUDU_CRUD = 0;
+
+  private static KuduClient kuduClient;
+
+  private static KuduTable kuduTable;
+
+  private static Map<String,ColumnSchema> columnDefs = new HashMap<>();
+
+  private BaseKuduOutputOperator simpleKuduOutputOperator;
+
+  private OperatorContext contextForKuduOutputOperator;
+
+  private TestPortContext testPortContextForKuduOutput;
+
+  @BeforeClass
+  public static void setup() throws Exception
+  {
+    kuduClient = getClientHandle();
+    if (kuduClient.tableExists(tableName)) {
+      kuduClient.deleteTable(tableName);
+    }
+    createTestTable(tableName,kuduClient);
+    kuduTable = kuduClient.openTable(tableName);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception
+  {
+    kuduClient.close();
+  }
+
+  @Before
+  public void setUpKuduOutputOperatorContext() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    contextForKuduOutputOperator = mockOperatorContext(OPERATOR_ID_FOR_KUDU_CRUD, attributeMap);
+    simpleKuduOutputOperator = new BaseKuduOutputOperator();
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+    testPortContextForKuduOutput = new TestPortContext(portAttributes);
+    simpleKuduOutputOperator.setup(contextForKuduOutputOperator);
+    simpleKuduOutputOperator.activate(contextForKuduOutputOperator);
+    simpleKuduOutputOperator.input.setup(testPortContextForKuduOutput);
+  }
+
+  private static KuduClient getClientHandle() throws Exception
+  {
+    KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("localhost:7051");
+    KuduClient client = builder.build();
+    return client;
+  }
+
+  private static void createTestTable(String tableName, KuduClient client) throws Exception
+  {
+    List<ColumnSchema> columns = new ArrayList<>();
+    ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
+        .key(true)
+        .build();
+    columns.add(intRowKeyCol);
+    columnDefs.put("introwkey",intRowKeyCol);
+    ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
+        .key(true)
+        .build();
+    columns.add(stringRowKeyCol);
+    columnDefs.put("stringrowkey",stringRowKeyCol);
+    ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
+        .key(true)
+        .build();
+    columns.add(timestampRowKey);
+    columnDefs.put("timestamprowkey",timestampRowKey);
+    ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
+        .build();
+    columns.add(longData);
+    columnDefs.put("longdata",longData);
+    ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
+        .build();
+    columns.add(stringData);
+    columnDefs.put("stringdata",stringData);
+    ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
+        .build();
+    columns.add(timestampdata);
+    columnDefs.put("timestampdata",timestampdata);
+    ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
+        .build();
+    columns.add(binarydata);
+    columnDefs.put("binarydata",binarydata);
+    ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
+        .build();
+    columns.add(floatdata);
+    columnDefs.put("floatdata",floatdata);
+    ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
+        .build();
+    columns.add(booldata);
+    columnDefs.put("booldata",booldata);
+    List<String> rangeKeys = new ArrayList<>();
+    rangeKeys.add("stringrowkey");
+    rangeKeys.add("timestamprowkey");
+    List<String> hashPartitions = new ArrayList<>();
+    hashPartitions.add("introwkey");
+    Schema schema = new Schema(columns);
+    try {
+      client.createTable(tableName, schema,
+          new CreateTableOptions()
+          .setNumReplicas(1)
+          .setRangePartitionColumns(rangeKeys)
+          .addHashPartitions(hashPartitions,2));
+    } catch (KuduException e) {
+      LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  private void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
+  {
+    KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
+        .build();
+    RowResultIterator rowResultItr = scanner.nextRows();
+    while (rowResultItr.hasNext()) {
+      RowResult thisRow = rowResultItr.next();
+      keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
+      keyInfo.setBooldata(thisRow.getBoolean("booldata"));
+      keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
+      keyInfo.setLongdata(thisRow.getLong("longdata"));
+      keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
+      keyInfo.setStringdata("stringdata");
+      break;
+    }
+  }
+
+  @Test
+  public void processForUpdate() throws Exception
+  {
+    KuduExecutionContext<UnitTestTablePojo> newInsertExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+    unitTestTablePojo.setIntrowkey(2);
+    unitTestTablePojo.setStringrowkey("two" + System.currentTimeMillis());
+    unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+    unitTestTablePojo.setBooldata(true);
+    unitTestTablePojo.setFloatdata(3.2f);
+    unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+    unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+    unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+    unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+    newInsertExecutionContext.setMutationType(KuduMutationType.INSERT);
+    newInsertExecutionContext.setPayload(unitTestTablePojo);
+
+    simpleKuduOutputOperator.beginWindow(1);
+    simpleKuduOutputOperator.input.process(newInsertExecutionContext);
+    KuduExecutionContext<UnitTestTablePojo> updateExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo updatingRecord = new UnitTestTablePojo();
+    updateExecutionContext.setMutationType(KuduMutationType.UPDATE);
+    updatingRecord.setBooldata(false);
+    updatingRecord.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    updatingRecord.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    updatingRecord.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    updateExecutionContext.setPayload(updatingRecord);
+    simpleKuduOutputOperator.input.process(updateExecutionContext);
+    simpleKuduOutputOperator.endWindow();
+
+    UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+    unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    lookUpAndPopulateRecord(unitTestTablePojoRead);
+    assertEquals(unitTestTablePojoRead.isBooldata(), false);
+  }
+
+  @Test
+  public void processForUpsert() throws Exception
+  {
+    KuduExecutionContext<UnitTestTablePojo> upsertExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+    unitTestTablePojo.setIntrowkey(3);
+    unitTestTablePojo.setStringrowkey("three" + System.currentTimeMillis());
+    unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+    unitTestTablePojo.setBooldata(false);
+    unitTestTablePojo.setFloatdata(3.2f);
+    unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+    unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+    unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+    unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+    upsertExecutionContext.setMutationType(KuduMutationType.UPSERT);
+    upsertExecutionContext.setPayload(unitTestTablePojo);
+
+    simpleKuduOutputOperator.beginWindow(2);
+    simpleKuduOutputOperator.input.process(upsertExecutionContext);
+    upsertExecutionContext.setMutationType(KuduMutationType.UPSERT);
+    unitTestTablePojo.setBooldata(true);
+    simpleKuduOutputOperator.input.process(upsertExecutionContext);
+    simpleKuduOutputOperator.endWindow();
+
+    UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+    unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    lookUpAndPopulateRecord(unitTestTablePojoRead);
+    assertEquals(unitTestTablePojoRead.isBooldata(), true);
+  }
+
+  @Test
+  public void processForDelete() throws Exception
+  {
+    KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+    unitTestTablePojo.setIntrowkey(4);
+    unitTestTablePojo.setStringrowkey("four" + System.currentTimeMillis());
+    unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+    unitTestTablePojo.setBooldata(false);
+    unitTestTablePojo.setFloatdata(3.2f);
+    unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+    unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+    unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+    unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+    insertExecutionContext.setMutationType(KuduMutationType.INSERT);
+    insertExecutionContext.setPayload(unitTestTablePojo);
+
+    simpleKuduOutputOperator.beginWindow(3);
+    simpleKuduOutputOperator.input.process(insertExecutionContext);
+    KuduExecutionContext<UnitTestTablePojo> deleteExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo unitTestTablePojoDelete = new UnitTestTablePojo();
+    unitTestTablePojoDelete.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    unitTestTablePojoDelete.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    unitTestTablePojoDelete.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    deleteExecutionContext.setMutationType(KuduMutationType.DELETE);
+    deleteExecutionContext.setPayload(unitTestTablePojoDelete);
+    simpleKuduOutputOperator.input.process(deleteExecutionContext);
+    simpleKuduOutputOperator.endWindow();
+
+    UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+    unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    lookUpAndPopulateRecord(unitTestTablePojoRead);
+    assertEquals(unitTestTablePojoRead.getBinarydata(), null);
+  }
+
+  @Test
+  public void processForInsert() throws Exception
+  {
+    KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>();
+    UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+    unitTestTablePojo.setIntrowkey(1);
+    unitTestTablePojo.setStringrowkey("one" + System.currentTimeMillis());
+    unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+    unitTestTablePojo.setBooldata(true);
+    unitTestTablePojo.setFloatdata(3.2f);
+    unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+    unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+    unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+    unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+    insertExecutionContext.setMutationType(KuduMutationType.INSERT);
+    insertExecutionContext.setPayload(unitTestTablePojo);
+
+    simpleKuduOutputOperator.beginWindow(0);
+    simpleKuduOutputOperator.input.process(insertExecutionContext);
+    simpleKuduOutputOperator.endWindow();
+
+    UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+    unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+    unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+    unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+    lookUpAndPopulateRecord(unitTestTablePojoRead);
+    assertEquals("" + unitTestTablePojoRead.getFloatdata(),"" + unitTestTablePojo.getFloatdata());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
new file mode 100644
index 0000000..3933df7
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+public class SimpleKuduOutputOperator extends AbstractKuduOutputOperator
+{
+  @Override
+  ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
+  {
+    return new ApexKuduConnection.ApexKuduConnectionBuilder()
+        .withAPossibleMasterHostAs("localhost:7051")
+        .withTableName("unittests")
+        .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+        .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+        .withNumberOfBossThreads(1)
+        .withNumberOfWorkerThreads(2)
+        .withSocketReadTimeOutAs(3000)
+        .withOperationTimeOutAs(3000);
+  }
+
+  @Override
+  protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+      long reconcilingWindowId)
+  {
+    return true;
+  }
+
+  @Override
+  protected Class getTuplePayloadClass()
+  {
+    return UnitTestTablePojo.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
new file mode 100644
index 0000000..dc2cc33
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+
+import java.nio.ByteBuffer;
+
+public class UnitTestTablePojo
+{
+  private int introwkey;
+  private String stringrowkey;
+  private long timestamprowkey;
+  private long longdata;
+  private String stringdata;
+  private long timestampdata;
+  private ByteBuffer binarydata;
+  private float floatdata;
+  private boolean booldata;
+
+  public int getIntrowkey()
+  {
+    return introwkey;
+  }
+
+  public void setIntrowkey(int introwkey)
+  {
+    this.introwkey = introwkey;
+  }
+
+  public String getStringrowkey()
+  {
+    return stringrowkey;
+  }
+
+  public void setStringrowkey(String stringrowkey)
+  {
+    this.stringrowkey = stringrowkey;
+  }
+
+  public long getTimestamprowkey()
+  {
+    return timestamprowkey;
+  }
+
+  public void setTimestamprowkey(long timestamprowkey)
+  {
+    this.timestamprowkey = timestamprowkey;
+  }
+
+  public long getLongdata()
+  {
+    return longdata;
+  }
+
+  public void setLongdata(long longdata)
+  {
+    this.longdata = longdata;
+  }
+
+  public String getStringdata()
+  {
+    return stringdata;
+  }
+
+  public void setStringdata(String stringdata)
+  {
+    this.stringdata = stringdata;
+  }
+
+  public long getTimestampdata()
+  {
+    return timestampdata;
+  }
+
+  public void setTimestampdata(long timestampdata)
+  {
+    this.timestampdata = timestampdata;
+  }
+
+  public ByteBuffer getBinarydata()
+  {
+    return binarydata;
+  }
+
+  public void setBinarydata(ByteBuffer binarydata)
+  {
+    this.binarydata = binarydata;
+  }
+
+  public float getFloatdata()
+  {
+    return floatdata;
+  }
+
+  public void setFloatdata(float floatdata)
+  {
+    this.floatdata = floatdata;
+  }
+
+  public boolean isBooldata()
+  {
+    return booldata;
+  }
+
+  public void setBooldata(boolean booldata)
+  {
+    this.booldata = booldata;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/resources/kuduoutputoperator.properties
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/contrib/src/test/resources/kuduoutputoperator.properties
new file mode 100644
index 0000000..8f41c63
--- /dev/null
+++ b/contrib/src/test/resources/kuduoutputoperator.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+masterhosts=192.168.2.141:7051,192.168.2.133:7051
+tablename=unittests
+pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file