You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/03 21:22:03 UTC
apex-malhar git commit:
APEXMALHAR-2278.KuduNonTransactionalOutputOperator
Repository: apex-malhar
Updated Branches:
refs/heads/master 2fe2903bf -> 10dd94ef5
APEXMALHAR-2278.KuduNonTransactionalOutputOperator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/10dd94ef
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/10dd94ef
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/10dd94ef
Branch: refs/heads/master
Commit: 10dd94ef56e81a795a9a7295e74b686ffd79b255
Parents: 2fe2903
Author: Ananth <an...@gmail.com>
Authored: Thu May 4 06:19:05 2017 +1000
Committer: Ananth <an...@gmail.com>
Committed: Thu May 4 06:19:05 2017 +1000
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../kudu/AbstractKuduOutputOperator.java | 660 +++++++++++++++++++
.../malhar/contrib/kudu/ApexKuduConnection.java | 223 +++++++
.../contrib/kudu/BaseKuduOutputOperator.java | 158 +++++
.../contrib/kudu/KuduExecutionContext.java | 109 +++
.../malhar/contrib/kudu/KuduMutationType.java | 32 +
...uduCreateUpdateDeleteOutputOperatorTest.java | 336 ++++++++++
.../contrib/kudu/SimpleKuduOutputOperator.java | 52 ++
.../malhar/contrib/kudu/UnitTestTablePojo.java | 125 ++++
.../resources/kuduoutputoperator.properties | 22 +
10 files changed, 1723 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 893ec2d..83305cb 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -678,5 +678,11 @@
<artifactId>jackson-databind</artifactId>
<version>2.7.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>1.3.0</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
new file mode 100644
index 0000000..250334b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
@@ -0,0 +1,660 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Statistics;
+import org.apache.kudu.client.Update;
+import org.apache.kudu.client.Upsert;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An Abstract operator that would allow concrete implementations to write a POJO value into a given Kudu table.
+ * <p>
+ * To use this operator, the following needs to be done by the implementor
+ * <ol>
+ * <li>Create a concrete implementation of this operator and implement the method to define the connection
+ * properties to Kudu. The connection properties are set using the {@link ApexKuduConnection} using a builder pattern.
+ * </li>
+ * <li>Implement the logic how tuples need to ne handled in the event of a reconciliation phase ( i.e. when an
+ * operator is resuming back from failure ). See javadoc of the method for more details.</li>
+ * <li>Define the payload class</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Note that the tuple that is getting processed is not the POJO class. The tuple that is getting processed is the
+ * {@link KuduExecutionContext} instance. This is because the operator supports mutation types as a higher level
+ * construct than simply writing a POJO into a Kudu table row.
+ * </p>
+ * <p>
+ * Supported mutations are:
+ * <ol>
+ * <li>INSERT</li>
+ * <li>UPDATE</li>
+ * <li>UPSERT</li>
+ * <li>DELETE</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Please note that the Update mutation allows to change a subset of columns only even though there might be columns
+ * that were defined to be non-nullable. This is because the original mutation of type insert would have written the
+ * mandatory columns. In such scenarios, the method setDoNotWriteColumns() in {@link KuduExecutionContext} can be
+ * used to specify only those columns that need an update. This ways a read and Update pattern can be merged to a
+ * simple update pattern thus avoiding a read if required.</p>
+ * */
+@InterfaceStability.Evolving
+public abstract class AbstractKuduOutputOperator extends BaseOperator
+ implements Operator.ActivationListener<Context.OperatorContext>,Operator.CheckpointNotificationListener
+{
+
+ private transient ApexKuduConnection apexKuduConnection;
+
+ private transient KuduTable kuduTable;
+
+ private transient KuduSession kuduSession;
+
+ private transient KuduClient kuduClientHandle;
+
+ private transient Map<String,ColumnSchema> allColumnDefs;
+
+ private transient Map<String,Object> kuduColumnBasedGetters;
+
+ private Set<String> primaryKeyColumnNames;
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
+
+ @NotNull
+ private WindowDataManager windowDataManager;
+
+ private transient long currentWindowId;
+
+ private transient boolean isInReplayMode;
+
+ private transient boolean isInReconcilingMode;
+
+ private transient long reconcilingWindowId;
+
+ @AutoMetric
+ transient long numInserts;
+
+ @AutoMetric
+ transient long numUpserts;
+
+ @AutoMetric
+ transient long numDeletes;
+
+ @AutoMetric
+ transient long numUpdates;
+
+ @AutoMetric
+ transient long numOpsErrors;
+
+ @AutoMetric
+ transient long numBytesWritten;
+
+ @AutoMetric
+ transient long numRpcErrors;
+
+ @AutoMetric
+ transient long numWriteOps;
+
+ @AutoMetric
+ transient long numWriteRPCs;
+
+ @AutoMetric
+ long totalOpsErrors = 0;
+
+ @AutoMetric
+ long totalBytesWritten = 0;
+
+ @AutoMetric
+ long totalRpcErrors = 0;
+
+ @AutoMetric
+ long totalWriteOps = 0;
+
+ @AutoMetric
+ long totalWriteRPCs = 0;
+
+ @AutoMetric
+ long totalInsertsSinceStart;
+
+ @AutoMetric
+ long totalUpsertsSinceStart;
+
+ @AutoMetric
+ long totalDeletesSinceStart;
+
+ @AutoMetric
+ long totalUpdatesSinceStart;
+
+ public final transient DefaultInputPort<KuduExecutionContext> input = new DefaultInputPort<KuduExecutionContext>()
+ {
+ @Override
+ public void process(KuduExecutionContext kuduExecutionContext)
+ {
+ processTuple(kuduExecutionContext);
+ }
+ }; // end input port implementation
+
+ public void processTuple(KuduExecutionContext kuduExecutionContext)
+ {
+ if ( isInReconcilingMode || isInReplayMode) {
+ if ( !isEligibleForPassivationInReconcilingWindow(kuduExecutionContext, currentWindowId)) {
+ return;
+ }
+ }
+ KuduMutationType mutationType = kuduExecutionContext.getMutationType();
+ switch (mutationType) {
+ case DELETE:
+ processForDelete(kuduExecutionContext);
+ numDeletes += 1;
+ totalDeletesSinceStart += 1;
+ break;
+ case INSERT:
+ processForInsert(kuduExecutionContext);
+ numInserts += 1;
+ totalInsertsSinceStart += 1;
+ break;
+ case UPDATE:
+ processForUpdate(kuduExecutionContext);
+ numUpdates += 1;
+ totalUpdatesSinceStart += 1;
+ break;
+ case UPSERT:
+ processForUpsert(kuduExecutionContext);
+ numUpserts += 1;
+ totalUpsertsSinceStart += 1;
+ break;
+ default:
+ break;
+ }
+ }
+
+ /***
+ * Sets the values from the Pojo into the Kudu mutation object.
+ * @param currentOperation The operation instance that represents the current mutation. This will be applied to the
+ * current session
+ * @param kuduExecutionContext The tuple that contains the payload as well as other information like mutation type etc
+ */
+ @SuppressWarnings(value = "unchecked")
+ private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext)
+ {
+ currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
+ currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp());
+ PartialRow partialRow = currentOperation.getRow();
+ Object payload = kuduExecutionContext.getPayload();
+ Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns();
+ if (doNotWriteColumns == null) {
+ doNotWriteColumns = new HashSet<>();
+ }
+ for (String columnName: kuduColumnBasedGetters.keySet()) {
+ if ( doNotWriteColumns.contains(columnName)) {
+ continue;
+ }
+ ColumnSchema columnSchema = allColumnDefs.get(columnName);
+ Type dataType = columnSchema.getType();
+ try {
+ switch (dataType) {
+ case STRING:
+ PojoUtils.Getter<Object, String> stringGetter = ((PojoUtils.Getter<Object, String>)kuduColumnBasedGetters
+ .get(columnName));
+ if (stringGetter != null) {
+ final String stringValue = stringGetter.get(payload);
+ if (stringValue != null) {
+ partialRow.addString(columnName, stringValue);
+ }
+ }
+ break;
+ case BINARY:
+ PojoUtils.Getter<Object, ByteBuffer> byteBufferGetter = ((PojoUtils.Getter<Object, ByteBuffer>)
+ kuduColumnBasedGetters.get(columnName));
+ if (byteBufferGetter != null) {
+ final ByteBuffer byteBufferValue = byteBufferGetter.get(payload);
+ if (byteBufferValue != null) {
+ partialRow.addBinary(columnName, byteBufferValue);
+ }
+ }
+ break;
+ case BOOL:
+ PojoUtils.GetterBoolean<Object> boolGetter = ((PojoUtils.GetterBoolean<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (boolGetter != null) {
+ final boolean boolValue = boolGetter.get(payload);
+ partialRow.addBoolean(columnName, boolValue);
+ }
+ break;
+ case DOUBLE:
+ PojoUtils.GetterDouble<Object> doubleGetter = ((PojoUtils.GetterDouble<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (doubleGetter != null) {
+ final double doubleValue = doubleGetter.get(payload);
+ partialRow.addDouble(columnName, doubleValue);
+ }
+ break;
+ case FLOAT:
+ PojoUtils.GetterFloat<Object> floatGetter = ((PojoUtils.GetterFloat<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (floatGetter != null) {
+ final float floatValue = floatGetter.get(payload);
+ partialRow.addFloat(columnName, floatValue);
+ }
+ break;
+ case INT8:
+ PojoUtils.GetterByte<Object> byteGetter = ((PojoUtils.GetterByte<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (byteGetter != null) {
+ final byte byteValue = byteGetter.get(payload);
+ partialRow.addByte(columnName, byteValue);
+ }
+ break;
+ case INT16:
+ PojoUtils.GetterShort<Object> shortGetter = ((PojoUtils.GetterShort<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (shortGetter != null) {
+ final short shortValue = shortGetter.get(payload);
+ partialRow.addShort(columnName, shortValue);
+ }
+ break;
+ case INT32:
+ PojoUtils.GetterInt<Object> intGetter = ((PojoUtils.GetterInt<Object>)
+ kuduColumnBasedGetters.get(columnName));
+ if (intGetter != null) {
+ final int intValue = intGetter.get(payload);
+ partialRow.addInt(columnName, intValue);
+ }
+ break;
+ case INT64:
+ case UNIXTIME_MICROS:
+ PojoUtils.GetterLong<Object> longGetter = ((PojoUtils.GetterLong<Object>)kuduColumnBasedGetters.get(
+ columnName));
+ if (longGetter != null) {
+ final long longValue = longGetter.get(payload);
+ partialRow.addLong(columnName, longValue);
+ }
+ break;
+ default:
+ LOG.error(columnName + " is not of the supported data type");
+ throw new UnsupportedOperationException("Kudu does not support data type for column " + columnName);
+ }
+ } catch ( Exception ex ) {
+ LOG.error(" Exception while fetching the value of " + columnName + " because " + ex.getMessage());
+ partialRow.setNull(columnName);
+ }
+ }
+ try {
+ kuduSession.apply(currentOperation);
+ } catch (KuduException e) {
+ LOG.error("Could not execute operation because " + e.getMessage(), e);
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ protected void processForUpdate(KuduExecutionContext kuduExecutionContext)
+ {
+ Update thisUpdate = kuduTable.newUpdate();
+ performCommonProcessing(thisUpdate,kuduExecutionContext);
+ }
+
+
+ protected void processForUpsert(KuduExecutionContext kuduExecutionContext)
+ {
+ Upsert thisUpsert = kuduTable.newUpsert();
+ performCommonProcessing(thisUpsert,kuduExecutionContext);
+ }
+
+
+
+ protected void processForDelete(KuduExecutionContext kuduExecutionContext)
+ {
+ Delete thisDelete = kuduTable.newDelete();
+ // Kudu does not allow column values to be set in case of a delete mutation
+ Set<String> doNotWriteCols = kuduExecutionContext.getDoNotWriteColumns();
+ if ( doNotWriteCols == null) {
+ doNotWriteCols = new HashSet<>();
+ }
+ doNotWriteCols.clear();
+ for (String columnName : allColumnDefs.keySet()) {
+ if ( !(primaryKeyColumnNames.contains(columnName))) {
+ doNotWriteCols.add(columnName);
+ }
+ }
+ kuduExecutionContext.setDoNotWriteColumns(doNotWriteCols);
+ performCommonProcessing(thisDelete,kuduExecutionContext);
+ }
+
+
+ protected void processForInsert(KuduExecutionContext kuduExecutionContext)
+ {
+ Insert thisInsert = kuduTable.newInsert();
+ performCommonProcessing(thisInsert,kuduExecutionContext);
+ }
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder = getKuduConnectionConfig();
+ apexKuduConnection = apexKuduConnectionBuilder.build();
+ checkNotNull(apexKuduConnection,"Kudu connection cannot be null");
+ kuduTable = apexKuduConnection.getKuduTable();
+ kuduSession = apexKuduConnection.getKuduSession();
+ kuduClientHandle = apexKuduConnection.getKuduClient();
+ checkNotNull(kuduTable,"Kudu Table cannot be null");
+ checkNotNull(kuduSession, "Kudu session cannot be null");
+ allColumnDefs = new HashMap();
+ primaryKeyColumnNames = new HashSet<>();
+ kuduColumnBasedGetters = new HashMap();
+ buildGettersForPojoPayload();
+ reconcilingWindowId = Stateless.WINDOW_ID;
+ // The operator is working in a replay mode where the upstream buffer is re-streaming the tuples
+ // Note that there are two windows that need special core. The window that is being replayed and the subsequent
+ // window that might have resulted in a crash which we are referring as reconciling window
+ if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+ context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+ windowDataManager.getLargestCompletedWindow()) {
+ reconcilingWindowId = windowDataManager.getLargestCompletedWindow() + 1;
+ }
+
+ if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+ context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) ==
+ windowDataManager.getLargestCompletedWindow()) {
+ reconcilingWindowId = windowDataManager.getLargestCompletedWindow();
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ try {
+ apexKuduConnection.close();
+ } catch (Exception e) {
+ LOG.error("Could not close kudu session and resources because " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long l)
+ {
+ // Nothing to be done here. Child classes can use this if required
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ // Nothing to be done here. Child classes can use this if required
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ windowDataManager = getWindowDataManager();
+ if ( windowDataManager == null) {
+ windowDataManager = new FSWindowDataManager();
+ }
+ windowDataManager.setup(context);
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowDataManager.teardown();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ currentWindowId = windowId;
+ if ( currentWindowId != Stateless.WINDOW_ID) { // if it is not the first window of the application
+ if (currentWindowId > reconcilingWindowId) {
+ isInReplayMode = false;
+ isInReconcilingMode = false;
+ }
+ if (currentWindowId == reconcilingWindowId) {
+ isInReconcilingMode = true;
+ isInReplayMode = false;
+ }
+ if (currentWindowId < reconcilingWindowId) {
+ isInReconcilingMode = false;
+ isInReplayMode = true;
+ }
+ }
+ numDeletes = 0;
+ numInserts = 0;
+ numUpdates = 0;
+ numUpserts = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ try {
+ kuduSession.flush();
+ } catch (KuduException e) {
+ LOG.error("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ if ( currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+ try {
+ windowDataManager.save(currentWindowId,currentWindowId);
+ } catch (IOException e) {
+ LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ numOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) -
+ totalOpsErrors;
+ numBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN) -
+ totalBytesWritten;
+ numRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS) -
+ totalRpcErrors;
+ numWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS) -
+ totalWriteOps;
+ numWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS) - totalWriteOps;
+ totalOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS);
+ totalBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN);
+ totalRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS);
+ totalWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS);
+ totalWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS);
+ }
+
+ private void buildGettersForPojoPayload()
+ {
+ Class payloadClass = getTuplePayloadClass();
+ checkNotNull(payloadClass,"Payload class cannot be null");
+ Field[] classFields = payloadClass.getDeclaredFields();
+ Schema schemaInfo = kuduTable.getSchema();
+ List<ColumnSchema> allColumns = schemaInfo.getColumns();
+ Set<String> allKuduTableColumnNames = new HashSet<>();
+ Map<String,ColumnSchema> normalizedColumns = new HashMap();
+ for ( ColumnSchema aColumnDef : allColumns) {
+ allColumnDefs.put(aColumnDef.getName(), aColumnDef);
+ normalizedColumns.put(aColumnDef.getName().toLowerCase(), aColumnDef);
+ allKuduTableColumnNames.add(aColumnDef.getName().toLowerCase());
+ }
+ List<ColumnSchema> primaryKeyColumns = schemaInfo.getPrimaryKeyColumns();
+ for (ColumnSchema primaryKeyInfo : primaryKeyColumns) {
+ primaryKeyColumnNames.add(primaryKeyInfo.getName());
+ }
+ Map<String,String> columnNameOverrides = getOverridingColumnNameMap();
+ if (columnNameOverrides == null) {
+ columnNameOverrides = new HashMap(); // to avoid null checks further down the line
+ }
+ for ( Field aFieldDef : classFields) {
+ String currentFieldName = aFieldDef.getName().toLowerCase();
+ if (allKuduTableColumnNames.contains(currentFieldName)) {
+ extractGetterForColumn(normalizedColumns.get(currentFieldName),aFieldDef);
+ } else {
+ if (columnNameOverrides.containsKey(aFieldDef.getName())) {
+ extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName()).toLowerCase()),
+ aFieldDef);
+ } else if (columnNameOverrides.containsKey(aFieldDef.getName().toLowerCase())) {
+ extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName().toLowerCase())
+ .toLowerCase()),aFieldDef);
+ }
+ }
+ }
+ }
+
+ /***
+ * Used to build a getter for the given schema column from the POJO field definition
+ * @param columnSchema The Kudu column definition
+ * @param fieldDefinition The POJO field definition
+ */
+ private void extractGetterForColumn(ColumnSchema columnSchema, Field fieldDefinition)
+ {
+ Type columnType = columnSchema.getType();
+ Class pojoClass = getTuplePayloadClass();
+ Object getter = null;
+ switch ( columnType ) {
+ case BINARY:
+ getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), ByteBuffer.class);
+ break;
+ case STRING:
+ getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), String.class);
+ break;
+ case BOOL:
+ getter = PojoUtils.createGetterBoolean(pojoClass, fieldDefinition.getName());
+ break;
+ case DOUBLE:
+ getter = PojoUtils.createGetterDouble(pojoClass, fieldDefinition.getName());
+ break;
+ case FLOAT:
+ getter = PojoUtils.createGetterFloat(pojoClass, fieldDefinition.getName());
+ break;
+ case INT8:
+ getter = PojoUtils.createGetterByte(pojoClass, fieldDefinition.getName());
+ break;
+ case INT16:
+ getter = PojoUtils.createGetterShort(pojoClass, fieldDefinition.getName());
+ break;
+ case INT32:
+ getter = PojoUtils.createGetterInt(pojoClass, fieldDefinition.getName());
+ break;
+ case INT64:
+ case UNIXTIME_MICROS:
+ getter = PojoUtils.createGetterLong(pojoClass, fieldDefinition.getName());
+ break;
+ default:
+ LOG.error(fieldDefinition.getName() + " has a data type that is not yet supported");
+ throw new UnsupportedOperationException(fieldDefinition.getName() + " does not have a compatible data type");
+ }
+ if (getter != null) {
+ kuduColumnBasedGetters.put(columnSchema.getName(),getter);
+ }
+ }
+
+
+ public static ApexKuduConnection.ApexKuduConnectionBuilder usingKuduConnectionBuilder()
+ {
+ return new ApexKuduConnection.ApexKuduConnectionBuilder();
+ }
+
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ public void setWindowDataManager(WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = windowDataManager;
+ }
+
+ /***
+ * Allows to map a POJO field name to a Kudu Table column name. This is useful in case
+ * the POJO field name can't be changed to an unconventional name ( ex: if kudu column names have underscores ). It
+ * can be also useful when the developer does not want to declare a new POJO but reuse an existing POJO.
+ * Note that the key in the map is the name of the field in the POJO and
+ * the value part is used to denote the name of the kudu column
+ * @return The map giving the mapping from POJO field name to the Kudu column name
+ */
+ protected Map<String,String> getOverridingColumnNameMap()
+ {
+ return new HashMap<>();
+ }
+
+ abstract ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig();
+
+ /***
+ * Represents the Tuple payload class that maps to a Kudu table row. Note that the POJO fields are mapped to the
+ * kudu column names is a lenient way. For example, the mapping of POJO field names to the kudu columns is done
+ * in a case-insensitive way.
+ * @return The class that will be used to map to a row in the given Kudu table.
+ */
+ protected abstract Class getTuplePayloadClass();
+
+ /***
+ * This is used to give control to the concrete implementation of this operator how to resolve whether to write a
+ * mutation into a given kudu table in the event of a failure and the operator subsequently resumes. This window
+ * is marked as a reconciling window. It is only for this reconciling window that we need to give control to the
+ * concrete operator implementor how to actually resolve if the entry needs to be excuted as a mutation in Kudu.
+ * @param executionContext The tuple which represents the execution context along with the payload
+ * @param reconcilingWindowId The window Id of the reconciling window
+ * @return true if we would like the entry to result in a mutation in the Kudu table.
+ */
+ protected abstract boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+ long reconcilingWindowId);
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
new file mode 100644
index 0000000..aed6b8b
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * <p>Represents a connection to the Kudu cluster. An instance of this class is to be supplied (via a builder pattern to)
+ * {@link AbstractKuduOutputOperator} to connect to a Kudu cluster.</p>
+ */
+
+public class ApexKuduConnection implements AutoCloseable, Serializable
+{
+ private static final long serialVersionUID = 4720185362997969198L;
+
+ private transient KuduSession kuduSession;
+
+ private transient KuduTable kuduTable;
+
+ private transient KuduClient kuduClient;
+
+ public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class);
+
+
+ private ApexKuduConnection(ApexKuduConnectionBuilder builder)
+ {
+ checkNotNull(builder,"Builder cannot be null to establish kudu session");
+ checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified");
+ checkNotNull(builder.tableName,"Kudu table cannot be null");
+ KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection);
+ if (builder.isOperationTimeOutSet) {
+ kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs);
+ }
+ if (builder.isBossThreadCountSet) {
+ kuduClientBuilder.bossCount(builder.numBossThreads);
+ }
+ if (builder.isWorkerThreadsCountSet) {
+ kuduClientBuilder.workerCount(builder.workerThreads);
+ }
+ if (builder.isSocketReadTimeOutSet) {
+ kuduClientBuilder.defaultSocketReadTimeoutMs(builder.socketReadTimeOutMs);
+ }
+ kuduClient = kuduClientBuilder.build();
+ kuduSession = kuduClient.newSession();
+ if (builder.isFlushModeSet) {
+ kuduSession.setFlushMode(builder.flushMode);
+ }
+ if (builder.isExternalConsistencyModeSet) {
+ kuduSession.setExternalConsistencyMode(builder.externalConsistencyMode);
+ }
+ try {
+ if (!kuduClient.tableExists(builder.tableName)) {
+ throw new Exception("Table " + builder.tableName + " does not exist. ");
+ } else {
+ kuduTable = kuduClient.openTable(builder.tableName);
+ }
+ } catch (Exception e) {
+ LOG.error("Kudu table existence could not be ascertained " + e.getMessage());
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ kuduSession.close();
+ kuduClient.close();
+ }
+
+ public KuduSession getKuduSession()
+ {
+ return kuduSession;
+ }
+
+ public void setKuduSession(KuduSession kuduSession)
+ {
+ this.kuduSession = kuduSession;
+ }
+
+ public KuduTable getKuduTable()
+ {
+ return kuduTable;
+ }
+
+ public void setKuduTable(KuduTable kuduTable)
+ {
+ this.kuduTable = kuduTable;
+ }
+
+ public KuduClient getKuduClient()
+ {
+ return kuduClient;
+ }
+
+ public void setKuduClient(KuduClient kuduClient)
+ {
+ this.kuduClient = kuduClient;
+ }
+
+ public static class ApexKuduConnectionBuilder
+ {
+ List<String> mastersCollection = new ArrayList<>();
+
+ String tableName;
+
+ // optional props
+ int numBossThreads = 1;
+
+ boolean isBossThreadCountSet = false;
+
+ int workerThreads = 2 * Runtime.getRuntime().availableProcessors();
+
+ boolean isWorkerThreadsCountSet = false;
+
+ long socketReadTimeOutMs = 10000;
+
+ boolean isSocketReadTimeOutSet = false;
+
+ long operationTimeOutMs = 30000;
+
+ boolean isOperationTimeOutSet = false;
+
+ ExternalConsistencyMode externalConsistencyMode;
+
+ boolean isExternalConsistencyModeSet = false;
+
+ SessionConfiguration.FlushMode flushMode;
+
+ boolean isFlushModeSet = false;
+
+ public ApexKuduConnectionBuilder withTableName(String tableName)
+ {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withAPossibleMasterHostAs(String masterHostAndPort)
+ {
+ mastersCollection.add(masterHostAndPort);
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withNumberOfBossThreads(int numberOfBossThreads)
+ {
+ this.numBossThreads = numberOfBossThreads;
+ isBossThreadCountSet = true;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withNumberOfWorkerThreads(int numberOfWorkerThreads)
+ {
+ this.workerThreads = numberOfWorkerThreads;
+ isWorkerThreadsCountSet = true;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withSocketReadTimeOutAs(long socketReadTimeOut)
+ {
+ this.socketReadTimeOutMs = socketReadTimeOut;
+ isSocketReadTimeOutSet = true;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withOperationTimeOutAs(long operationTimeOut)
+ {
+ this.operationTimeOutMs = operationTimeOut;
+ isOperationTimeOutSet = true;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode)
+ {
+ this.externalConsistencyMode = externalConsistencyMode;
+ isExternalConsistencyModeSet = true;
+ return this;
+ }
+
+ public ApexKuduConnectionBuilder withFlushMode(SessionConfiguration.FlushMode flushMode)
+ {
+ this.flushMode = flushMode;
+ isFlushModeSet = true;
+ return this;
+ }
+
+ protected ApexKuduConnection build()
+ {
+ ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
+ return apexKuduConnection;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
new file mode 100644
index 0000000..6de7190
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Provides a default implementation for writing tuples as Kudu rows.
+ * The user will have to either provide for
+ * <ol>
+ * <li>a property file containing properties like Kudu master list, Kudu table name and other connection properties.
+ * The operator will fail to launch if the properties file named kuduoutputoperator.properties is not locatable in the
+ * root path</li>
+ * <li>Use the default constructor which supports minimum required properties as parameters</li>
+ * <li>In case of presence of multiple Kudu output operators in the same application, use the String based
+ * constructor which accepts a file name for each of the kudu table that the incoming pojo needs to be passivated
+ * to</li>
+ * </ol>
+ * <p>
+ * The properties file will have to consist of the following keys:
+ * <br>masterhosts=<ip1:host>,<ip2:host>,..# Comma separated</br>
+ * <br>tablename=akudutablename</br>
+ * <br>pojoclassname=somepojoclasswithgettersandsetters; # Do not append name with .class at the end and
+ * do not forget to give the complete class name including the package</br>
+ * </p>
+ */
+public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
+{
+ public static final String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
+
+ public static final String TABLE_NAME = "tablename";
+
+ public static final String MASTER_HOSTS = "masterhosts";
+
+ public static final String POJO_CLASS_NAME = "pojoclassname";
+
+ private Class pojoPayloadClass;
+
+ private ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder;
+
+ public BaseKuduOutputOperator() throws IOException, ClassNotFoundException
+ {
+ initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
+ }
+
+ public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException
+ {
+ initConnectionBuilderProperties(configFileInClasspath);
+ }
+
+ private void initConnectionBuilderProperties(String configFileInClasspath) throws IOException, ClassNotFoundException
+ {
+ Properties kuduConnectionProperties = new Properties();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
+ if (kuduPropsFileAsStream != null) {
+ kuduConnectionProperties.load(kuduPropsFileAsStream);
+ } else {
+ throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
+ " is not locatable in the root classpath");
+ }
+ String tableName = checkNotNull(kuduConnectionProperties.getProperty(TABLE_NAME));
+ String pojoClassName = checkNotNull(kuduConnectionProperties.getProperty(POJO_CLASS_NAME));
+ String masterHostsConnectionString = checkNotNull(kuduConnectionProperties.getProperty(MASTER_HOSTS));
+ String[] masterAndHosts = masterHostsConnectionString.split(",");
+ pojoPayloadClass = Class.forName(pojoClassName);
+ initKuduConfig(tableName, Arrays.asList(masterAndHosts));
+ }
+
+ private void initKuduConfig(String kuduTableName, List<String> kuduMasters)
+ {
+ apexKuduConnectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder()
+ .withTableName(kuduTableName)
+ .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+ .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+ .withNumberOfBossThreads(1)
+ .withNumberOfWorkerThreads(2)
+ .withSocketReadTimeOutAs(3000)
+ .withOperationTimeOutAs(3000);
+ for ( String aMasterAndHost: kuduMasters ) {
+ apexKuduConnectionBuilder = apexKuduConnectionBuilder.withAPossibleMasterHostAs(aMasterAndHost);
+ }
+ }
+
+
+ public BaseKuduOutputOperator(String kuduTableName,List<String> kuduMasters, Class pojoPayloadClass)
+ {
+ this.pojoPayloadClass = pojoPayloadClass;
+ initKuduConfig(kuduTableName,kuduMasters);
+ }
+
+ @Override
+ ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
+ {
+ return apexKuduConnectionBuilder;
+ }
+
+ /**
+ * Can be used to further fine tune any of the connection configs once the constructor completes instantiating the
+ * Kudu Connection config builder.
+ * @return The Connection Config that would be used to initiate a connection to the Kudu Cluster once the operator is
+ * deserialized in the node manager managed container. See {@link AbstractKuduOutputOperator} activate() method for
+ * more details.
+ */
+ public ApexKuduConnection.ApexKuduConnectionBuilder getApexKuduConnectionBuilder()
+ {
+ return apexKuduConnectionBuilder;
+ }
+
+
+ /**
+ *
+ * @return The pojo class that would be streamed in the KuduExecutionContext
+ */
+ @Override
+ protected Class getTuplePayloadClass()
+ {
+ return pojoPayloadClass;
+ }
+
+ /**
+ * The default is to implement for ATLEAST_ONCE semantics. Override this control this behavior.
+ * @param executionContext The tuple which represents the execution context along with the payload
+ * @param reconcilingWindowId The window Id of the reconciling window
+ * @return
+ */
+ @Override
+ protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+ long reconcilingWindowId)
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
new file mode 100644
index 0000000..27d382d
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+
+/**
+ * <p>Represents a summary of the mutation that needs to be done on the Kudu table. The type of mutation is
+ * decided by the KuduMutation Type field. The actual data that is mutated inside the kudu table row is
+ * represented by the payload. The execution context itself a templated class based on the payload class.</p>
+ */
+public class KuduExecutionContext<T>
+{
+ private T payload;
+ /***
+ * <p>Represents the set of columns that are not to be written to a Kudu row. Note that this is useful when we
+ * would like to not write a set of columns into the table either because
+ * <ol>
+ * <li>We are doing an update and we would like to update only a few of the columns as the original columns were
+ * already written in the original insert mutation</li>
+ * <li>When we would like to not write a column because the column is an optional column as per the schema definition
+ * </li>
+ * </ol>
+ * It may be noted that the client driver will throw an exception when a mandatory column is not written.</p>
+ */
+ private Set<String> doNotWriteColumns = new HashSet<>();
+
+ private KuduMutationType mutationType = KuduMutationType.UPSERT;
+
+ private ExternalConsistencyMode externalConsistencyMode;
+
+ private long propagatedTimestamp;
+
+ public T getPayload()
+ {
+ return payload;
+ }
+
+ public void setPayload(T payload)
+ {
+ this.payload = payload;
+ }
+
+ public KuduMutationType getMutationType()
+ {
+ return mutationType;
+ }
+
+ public void setMutationType(KuduMutationType mutationType)
+ {
+ this.mutationType = mutationType;
+ }
+
+ public ExternalConsistencyMode getExternalConsistencyMode()
+ {
+ return externalConsistencyMode;
+ }
+
+ public void setExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode)
+ {
+ this.externalConsistencyMode = externalConsistencyMode;
+ }
+
+ public long getPropagatedTimestamp()
+ {
+ return propagatedTimestamp;
+ }
+
+ public void setPropagatedTimestamp(long propagatedTimestamp)
+ {
+ this.propagatedTimestamp = propagatedTimestamp;
+ }
+
+ public Set<String> getDoNotWriteColumns()
+ {
+ return doNotWriteColumns;
+ }
+
+ public void setDoNotWriteColumns(Set<String> doNotWriteColumns)
+ {
+ this.doNotWriteColumns = doNotWriteColumns;
+ }
+
+ public void addDoNotWriteColumn(String aKuduColumnName)
+ {
+ doNotWriteColumns.add(aKuduColumnName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
new file mode 100644
index 0000000..64b46c6
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+/**
+ * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
+ * mutation being represented by the current tuple</p>
+ */
+public enum KuduMutationType
+{
+
+ INSERT,
+ DELETE,
+ UPDATE,
+ UPSERT
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
new file mode 100644
index 0000000..615427a
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.helper.TestPortContext;
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.junit.Assert.assertEquals;
+
+
+public class KuduCreateUpdateDeleteOutputOperatorTest
+{
+
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(KuduCreateUpdateDeleteOutputOperatorTest.class);
+
+ private static final String tableName = "unittests";
+
+ private final String APP_ID = "TestKuduOutputOperator";
+
+ private final int OPERATOR_ID_FOR_KUDU_CRUD = 0;
+
+ private static KuduClient kuduClient;
+
+ private static KuduTable kuduTable;
+
+ private static Map<String,ColumnSchema> columnDefs = new HashMap<>();
+
+ private BaseKuduOutputOperator simpleKuduOutputOperator;
+
+ private OperatorContext contextForKuduOutputOperator;
+
+ private TestPortContext testPortContextForKuduOutput;
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ kuduClient = getClientHandle();
+ if (kuduClient.tableExists(tableName)) {
+ kuduClient.deleteTable(tableName);
+ }
+ createTestTable(tableName,kuduClient);
+ kuduTable = kuduClient.openTable(tableName);
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception
+ {
+ kuduClient.close();
+ }
+
+ @Before
+ public void setUpKuduOutputOperatorContext() throws Exception
+ {
+ Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ contextForKuduOutputOperator = mockOperatorContext(OPERATOR_ID_FOR_KUDU_CRUD, attributeMap);
+ simpleKuduOutputOperator = new BaseKuduOutputOperator();
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+ testPortContextForKuduOutput = new TestPortContext(portAttributes);
+ simpleKuduOutputOperator.setup(contextForKuduOutputOperator);
+ simpleKuduOutputOperator.activate(contextForKuduOutputOperator);
+ simpleKuduOutputOperator.input.setup(testPortContextForKuduOutput);
+ }
+
+ private static KuduClient getClientHandle() throws Exception
+ {
+ KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("localhost:7051");
+ KuduClient client = builder.build();
+ return client;
+ }
+
+ private static void createTestTable(String tableName, KuduClient client) throws Exception
+ {
+ List<ColumnSchema> columns = new ArrayList<>();
+ ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
+ .key(true)
+ .build();
+ columns.add(intRowKeyCol);
+ columnDefs.put("introwkey",intRowKeyCol);
+ ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
+ .key(true)
+ .build();
+ columns.add(stringRowKeyCol);
+ columnDefs.put("stringrowkey",stringRowKeyCol);
+ ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
+ .key(true)
+ .build();
+ columns.add(timestampRowKey);
+ columnDefs.put("timestamprowkey",timestampRowKey);
+ ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
+ .build();
+ columns.add(longData);
+ columnDefs.put("longdata",longData);
+ ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
+ .build();
+ columns.add(stringData);
+ columnDefs.put("stringdata",stringData);
+ ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
+ .build();
+ columns.add(timestampdata);
+ columnDefs.put("timestampdata",timestampdata);
+ ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
+ .build();
+ columns.add(binarydata);
+ columnDefs.put("binarydata",binarydata);
+ ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
+ .build();
+ columns.add(floatdata);
+ columnDefs.put("floatdata",floatdata);
+ ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
+ .build();
+ columns.add(booldata);
+ columnDefs.put("booldata",booldata);
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add("stringrowkey");
+ rangeKeys.add("timestamprowkey");
+ List<String> hashPartitions = new ArrayList<>();
+ hashPartitions.add("introwkey");
+ Schema schema = new Schema(columns);
+ try {
+ client.createTable(tableName, schema,
+ new CreateTableOptions()
+ .setNumReplicas(1)
+ .setRangePartitionColumns(rangeKeys)
+ .addHashPartitions(hashPartitions,2));
+ } catch (KuduException e) {
+ LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ private void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
+ {
+ KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
+ .build();
+ RowResultIterator rowResultItr = scanner.nextRows();
+ while (rowResultItr.hasNext()) {
+ RowResult thisRow = rowResultItr.next();
+ keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
+ keyInfo.setBooldata(thisRow.getBoolean("booldata"));
+ keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
+ keyInfo.setLongdata(thisRow.getLong("longdata"));
+ keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
+ keyInfo.setStringdata("stringdata");
+ break;
+ }
+ }
+
+ @Test
+ public void processForUpdate() throws Exception
+ {
+ KuduExecutionContext<UnitTestTablePojo> newInsertExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+ unitTestTablePojo.setIntrowkey(2);
+ unitTestTablePojo.setStringrowkey("two" + System.currentTimeMillis());
+ unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+ unitTestTablePojo.setBooldata(true);
+ unitTestTablePojo.setFloatdata(3.2f);
+ unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+ unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+ unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+ unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+ newInsertExecutionContext.setMutationType(KuduMutationType.INSERT);
+ newInsertExecutionContext.setPayload(unitTestTablePojo);
+
+ simpleKuduOutputOperator.beginWindow(1);
+ simpleKuduOutputOperator.input.process(newInsertExecutionContext);
+ KuduExecutionContext<UnitTestTablePojo> updateExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo updatingRecord = new UnitTestTablePojo();
+ updateExecutionContext.setMutationType(KuduMutationType.UPDATE);
+ updatingRecord.setBooldata(false);
+ updatingRecord.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ updatingRecord.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ updatingRecord.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ updateExecutionContext.setPayload(updatingRecord);
+ simpleKuduOutputOperator.input.process(updateExecutionContext);
+ simpleKuduOutputOperator.endWindow();
+
+ UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+ unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ lookUpAndPopulateRecord(unitTestTablePojoRead);
+ assertEquals(unitTestTablePojoRead.isBooldata(), false);
+ }
+
+ @Test
+ public void processForUpsert() throws Exception
+ {
+ KuduExecutionContext<UnitTestTablePojo> upsertExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+ unitTestTablePojo.setIntrowkey(3);
+ unitTestTablePojo.setStringrowkey("three" + System.currentTimeMillis());
+ unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+ unitTestTablePojo.setBooldata(false);
+ unitTestTablePojo.setFloatdata(3.2f);
+ unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+ unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+ unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+ unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+ upsertExecutionContext.setMutationType(KuduMutationType.UPSERT);
+ upsertExecutionContext.setPayload(unitTestTablePojo);
+
+ simpleKuduOutputOperator.beginWindow(2);
+ simpleKuduOutputOperator.input.process(upsertExecutionContext);
+ upsertExecutionContext.setMutationType(KuduMutationType.UPSERT);
+ unitTestTablePojo.setBooldata(true);
+ simpleKuduOutputOperator.input.process(upsertExecutionContext);
+ simpleKuduOutputOperator.endWindow();
+
+ UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+ unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ lookUpAndPopulateRecord(unitTestTablePojoRead);
+ assertEquals(unitTestTablePojoRead.isBooldata(), true);
+ }
+
+ @Test
+ public void processForDelete() throws Exception
+ {
+ KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+ unitTestTablePojo.setIntrowkey(4);
+ unitTestTablePojo.setStringrowkey("four" + System.currentTimeMillis());
+ unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+ unitTestTablePojo.setBooldata(false);
+ unitTestTablePojo.setFloatdata(3.2f);
+ unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+ unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+ unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+ unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+ insertExecutionContext.setMutationType(KuduMutationType.INSERT);
+ insertExecutionContext.setPayload(unitTestTablePojo);
+
+ simpleKuduOutputOperator.beginWindow(3);
+ simpleKuduOutputOperator.input.process(insertExecutionContext);
+ KuduExecutionContext<UnitTestTablePojo> deleteExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo unitTestTablePojoDelete = new UnitTestTablePojo();
+ unitTestTablePojoDelete.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ unitTestTablePojoDelete.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ unitTestTablePojoDelete.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ deleteExecutionContext.setMutationType(KuduMutationType.DELETE);
+ deleteExecutionContext.setPayload(unitTestTablePojoDelete);
+ simpleKuduOutputOperator.input.process(deleteExecutionContext);
+ simpleKuduOutputOperator.endWindow();
+
+ UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+ unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ lookUpAndPopulateRecord(unitTestTablePojoRead);
+ assertEquals(unitTestTablePojoRead.getBinarydata(), null);
+ }
+
+ @Test
+ public void processForInsert() throws Exception
+ {
+ KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>();
+ UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo();
+ unitTestTablePojo.setIntrowkey(1);
+ unitTestTablePojo.setStringrowkey("one" + System.currentTimeMillis());
+ unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis());
+ unitTestTablePojo.setBooldata(true);
+ unitTestTablePojo.setFloatdata(3.2f);
+ unitTestTablePojo.setStringdata("" + System.currentTimeMillis());
+ unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1);
+ unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2);
+ unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes()));
+ insertExecutionContext.setMutationType(KuduMutationType.INSERT);
+ insertExecutionContext.setPayload(unitTestTablePojo);
+
+ simpleKuduOutputOperator.beginWindow(0);
+ simpleKuduOutputOperator.input.process(insertExecutionContext);
+ simpleKuduOutputOperator.endWindow();
+
+ UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo();
+ unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey());
+ unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey());
+ unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey());
+ lookUpAndPopulateRecord(unitTestTablePojoRead);
+ assertEquals("" + unitTestTablePojoRead.getFloatdata(),"" + unitTestTablePojo.getFloatdata());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
new file mode 100644
index 0000000..3933df7
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+public class SimpleKuduOutputOperator extends AbstractKuduOutputOperator
+{
+ @Override
+ ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
+ {
+ return new ApexKuduConnection.ApexKuduConnectionBuilder()
+ .withAPossibleMasterHostAs("localhost:7051")
+ .withTableName("unittests")
+ .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+ .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+ .withNumberOfBossThreads(1)
+ .withNumberOfWorkerThreads(2)
+ .withSocketReadTimeOutAs(3000)
+ .withOperationTimeOutAs(3000);
+ }
+
+ @Override
+ protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
+ long reconcilingWindowId)
+ {
+ return true;
+ }
+
+ @Override
+ protected Class getTuplePayloadClass()
+ {
+ return UnitTestTablePojo.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
new file mode 100644
index 0000000..dc2cc33
--- /dev/null
+++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.kudu;
+
+
+import java.nio.ByteBuffer;
+
+public class UnitTestTablePojo
+{
+ private int introwkey;
+ private String stringrowkey;
+ private long timestamprowkey;
+ private long longdata;
+ private String stringdata;
+ private long timestampdata;
+ private ByteBuffer binarydata;
+ private float floatdata;
+ private boolean booldata;
+
+ public int getIntrowkey()
+ {
+ return introwkey;
+ }
+
+ public void setIntrowkey(int introwkey)
+ {
+ this.introwkey = introwkey;
+ }
+
+ public String getStringrowkey()
+ {
+ return stringrowkey;
+ }
+
+ public void setStringrowkey(String stringrowkey)
+ {
+ this.stringrowkey = stringrowkey;
+ }
+
+ public long getTimestamprowkey()
+ {
+ return timestamprowkey;
+ }
+
+ public void setTimestamprowkey(long timestamprowkey)
+ {
+ this.timestamprowkey = timestamprowkey;
+ }
+
+ public long getLongdata()
+ {
+ return longdata;
+ }
+
+ public void setLongdata(long longdata)
+ {
+ this.longdata = longdata;
+ }
+
+ public String getStringdata()
+ {
+ return stringdata;
+ }
+
+ public void setStringdata(String stringdata)
+ {
+ this.stringdata = stringdata;
+ }
+
+ public long getTimestampdata()
+ {
+ return timestampdata;
+ }
+
+ public void setTimestampdata(long timestampdata)
+ {
+ this.timestampdata = timestampdata;
+ }
+
+ public ByteBuffer getBinarydata()
+ {
+ return binarydata;
+ }
+
+ public void setBinarydata(ByteBuffer binarydata)
+ {
+ this.binarydata = binarydata;
+ }
+
+ public float getFloatdata()
+ {
+ return floatdata;
+ }
+
+ public void setFloatdata(float floatdata)
+ {
+ this.floatdata = floatdata;
+ }
+
+ public boolean isBooldata()
+ {
+ return booldata;
+ }
+
+ public void setBooldata(boolean booldata)
+ {
+ this.booldata = booldata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/resources/kuduoutputoperator.properties
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/contrib/src/test/resources/kuduoutputoperator.properties
new file mode 100644
index 0000000..8f41c63
--- /dev/null
+++ b/contrib/src/test/resources/kuduoutputoperator.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+masterhosts=192.168.2.141:7051,192.168.2.133:7051
+tablename=unittests
+pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file