You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/09/27 14:56:29 UTC

[apex-malhar] branch master updated: APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing review comments by Thomas, Vlad and Pramod

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20c407b  APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing review comments by Thomas,Vlad and Pramod
20c407b is described below

commit 20c407b145a063f7d541164bde732fbd7fd68a77
Author: Ananth <an...@gmail.com>
AuthorDate: Tue Sep 26 04:14:17 2017 +1000

    APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing review comments by Thomas,Vlad and Pramod
---
 contrib/pom.xml                                    |    6 -
 kudu/pom.xml                                       |  168 ++++
 .../malhar/kudu/sqlparser/KuduSQLExpression.g4     |  169 ++++
 .../malhar/kudu/AbstractKuduInputOperator.java     | 1035 ++++++++++++++++++++
 .../malhar}/kudu/AbstractKuduOutputOperator.java   |   17 +-
 .../apex/malhar}/kudu/ApexKuduConnection.java      |   22 +-
 .../apex/malhar}/kudu/BaseKuduOutputOperator.java  |    8 +-
 .../kudu/IncrementalStepScanInputOperator.java     |  261 +++++
 .../malhar/kudu/InputOperatorControlTuple.java     |   81 ++
 .../apex/malhar}/kudu/KuduExecutionContext.java    |    2 +-
 .../apache/apex/malhar}/kudu/KuduMutationType.java |    2 +-
 .../org/apache/apex/malhar/kudu/package-info.java  |   21 +-
 .../partitioner/AbstractKuduInputPartitioner.java  |  251 +++++
 .../kudu/partitioner/KuduOneToManyPartitioner.java |   86 ++
 .../kudu/partitioner/KuduOneToOnePartitioner.java  |   71 ++
 .../partitioner/KuduPartitionScanStrategy.java     |   14 +-
 .../apex/malhar/kudu/partitioner/package-info.java |   20 +-
 .../kudu/scanner/AbstractKuduPartitionScanner.java |  242 +++++
 .../KuduPartitionConsistentOrderScanner.java       |  107 ++
 .../scanner/KuduPartitionRandomOrderScanner.java   |   67 ++
 .../scanner/KuduPartitionScanAssignmentMeta.java   |  120 +++
 .../kudu/scanner/KuduPartitionScannerCallable.java |  204 ++++
 .../malhar/kudu/scanner/KuduRecordWithMeta.java    |   88 ++
 .../malhar/kudu/scanner/KuduScanOrderStrategy.java |   14 +-
 .../apex/malhar/kudu/scanner/package-info.java     |   22 +-
 .../KuduSQLExpressionErrorListener.java            |   65 ++
 .../sqltranslator/KuduSQLParseTreeListener.java    |  601 ++++++++++++
 .../malhar/kudu/sqltranslator/KuduSQLParser.java   |   49 +
 .../SQLToKuduPredicatesTranslator.java             |  129 +++
 .../malhar/kudu/sqltranslator/package-info.java    |   21 +-
 .../malhar/kudu/AbstractKuduInputOperatorTest.java |   86 ++
 .../kudu/IncrementalStepScanInputOperatorTest.java |  117 +++
 .../apex/malhar/kudu/KuduClientTestCommons.java    |  260 +++++
 .../KuduCreateUpdateDeleteOutputOperatorTest.java  |  154 +--
 .../apex/malhar/kudu/KuduInputOperatorCommons.java |  212 ++++
 .../malhar}/kudu/SimpleKuduOutputOperator.java     |   12 +-
 .../kudu/UnitTestStepwiseScanInputOperator.java    |   32 +-
 .../apex/malhar}/kudu/UnitTestTablePojo.java       |   18 +-
 .../AbstractKuduInputPartitionerTest.java          |  113 +++
 .../partitioner/KuduOneToManyPartitionerTest.java  |   82 ++
 .../partitioner/KuduOneToOnePartitionerTest.java   |   76 ++
 .../scanner/AbstractKuduPartitionScannerTest.java  |   98 ++
 .../scanner/KuduPartitionScannerCallableTest.java  |  133 +++
 .../SQLToKuduPredicatesTranslatorTest.java         |  156 +++
 .../kudu/test/KuduClusterAvailabilityTestRule.java |  162 +++
 .../malhar/kudu/test/KuduClusterTestContext.java   |   21 +-
 ...kuduincrementalstepscaninputoperator.properties |    9 +-
 .../test/resources/kuduoutputoperator.properties   |    4 +-
 kudu/src/test/resources/log4j.properties           |   45 +
 pom.xml                                            |    1 +
 50 files changed, 5499 insertions(+), 255 deletions(-)

diff --git a/contrib/pom.xml b/contrib/pom.xml
index 3c5797c..1434c49 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -672,11 +672,5 @@
       <artifactId>jackson-databind</artifactId>
       <version>2.7.0</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.kudu</groupId>
-      <artifactId>kudu-client</artifactId>
-      <version>1.3.0</version>
-      <optional>true</optional>
-    </dependency>
   </dependencies>
 </project>
diff --git a/kudu/pom.xml b/kudu/pom.xml
new file mode 100755
index 0000000..2b518ae
--- /dev/null
+++ b/kudu/pom.xml
@@ -0,0 +1,168 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-kudu</artifactId>
+  <name>Apache Apex Malhar Kudu Support</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <checkstyle.console>false</checkstyle.console>
+    <kudu-client-version>1.5.0</kudu-client-version>
+    <disruptor-queue-conversant-media-version>1.2.10</disruptor-queue-conversant-media-version>
+    <antlr4-runtime-version>4.7</antlr4-runtime-version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.antlr</groupId>
+        <artifactId>antlr4-maven-plugin</artifactId>
+        <version>4.7</version>
+        <executions>
+          <execution>
+            <id>kudusqlantlr4parsergen</id>
+            <goals>
+              <goal>antlr4</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
+          <excludes>**/KuduSQLExpression*</excludes> <!--To account for violations from the Antlr autogenerated code -->
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes combine.children="append">
+            <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.tokens</exclude>
+            <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionLexer.tokens</exclude>
+            <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionLexer.java</exclude>
+            <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionListener.java</exclude>
+            <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionParser.java</exclude>
+            <exclude>src/main/antlr4/KuduSQLExpression.g4</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${basedir}/target/generated-sources/antlr4</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu-client-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr4-runtime</artifactId>
+      <version>${antlr4-runtime-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.conversantmedia</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor-queue-conversant-media-version}</version>
+      <classifier>jdk7</classifier><!-- Set classifier to jdk8 when malhar switches to JDK 8 -->
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.8.47</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito2</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-junit</artifactId>
+      <version>2.0.0.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4 b/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4
new file mode 100644
index 0000000..ec3ea79
--- /dev/null
+++ b/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+grammar KuduSQLExpression;
+
+kudusqlexpression: (WHITESPACE)* SELECT (WHITESPACE)+ columnnamesselect (tableclause)? (whereclause)? (endstatement)? (withoptionsclause)? (WHITESPACE)*;
+
+columnnamesselect : '*' # ALL_COLUMNS_SELECT_EXP
+             | colnameselectexpression # COLNAME_BASED_SELECT_EXP
+             ;
+
+colnameselectexpression : colnameselectexpression (WHITESPACE)* COMMA (WHITESPACE)* colnameselectexpression # SELECT_COMPLEX_COL_EXPRESSION
+                    | idorcolumnname (WHITESPACE)+ AS (WHITESPACE)+ ID # SELECT_ALIAS_USED
+                    | idorcolumnname # SELECT_ID_ONLY_USED_AS_COLUMN_NAME
+                    ;
+
+idorcolumnname:       ID
+                    | '\'' (WHITESPACE)* FROM (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* WHERE (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* USING (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* OPTIONS (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* NOT (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* NULL (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* CONTROLTUPLE_MESSAGE (WHITESPACE)* '\''
+                    | '\'' (WHITESPACE)* READ_SNAPSHOT_TIME (WHITESPACE)* '\''
+              ;
+
+tableclause : (WHITESPACE)+ FROM (WHITESPACE)+ ID;
+
+whereclause : (WHITESPACE)+ WHERE (WHITESPACE)+ columnfilterexpression;
+
+withoptionsclause: (WHITESPACE)+ USING (WHITESPACE)+ OPTIONS (WHITESPACE)+ keyvaluepair;
+
+
+keyvaluepair:
+              READ_SNAPSHOT_TIME (WHITESPACE)* EQUAL_TO (WHITESPACE)* INT #SET_READ_SNAPSHOT_TIME
+             | CONTROLTUPLE_MESSAGE (WHITESPACE)* EQUAL_TO (WHITESPACE)* STRINGVAL #SET_CONTROL_TUPLE_MSG
+             | keyvaluepair (WHITESPACE)+ COMMA (WHITESPACE)* keyvaluepair #SET_MULTI_OPTIONS
+            ;
+
+columnfilterexpression: columnfilterexpression (WHITESPACE)* AND (WHITESPACE)*  columnfilterexpression #FILTER_COMPLEX_FILTER_EXP
+                      | idorcolumnname (WHITESPACE)* comparisionoperator (WHITESPACE)* anyvalue #FILTER_COMPARISION_EXP
+                      | idorcolumnname (WHITESPACE)+ IS (WHITESPACE)+ NOT (WHITESPACE)+ NULL #IS_NOT_NULL_FILTER_EXP
+                      | idorcolumnname (WHITESPACE)+ IS (WHITESPACE)+ NULL #IS_NULL_FILTER_EXP
+                      | idorcolumnname (WHITESPACE)+ IN (WHITESPACE)+ listofanyvalue #IN_FILTER_EXP
+                      ;
+
+
+comparisionoperator: EQUAL_TO
+                     | LESSER_THAN
+                     | LESSER_THAN_OR_EQUAL
+                     | GREATER_THAN
+                     | GREATER_THAN_OR_EQUAL
+                     ;
+anyvalue: bool
+        | numval
+        | doubleval
+        | floatval
+        | stringval
+        ;
+
+bool: TRUE
+    | FALSE;
+
+numval: INT;
+
+doubleval: INT '.' INT 'd'
+         | '.' INT 'd'
+         ;
+
+floatval: INT '.' INT 'f'
+        | '.' INT 'f'
+        ;
+
+stringval: STRINGVAL;
+
+listofanyvalue: listofbools
+              | listofnums
+              | listoffloats
+              | listofdoubles
+              | listofstrings
+
+              ;
+
+listofbools: LPAREN  (WHITESPACE)* bool (WHITESPACE)* (COMMA (WHITESPACE)* bool)* (WHITESPACE)* RPAREN;
+listofnums:  LPAREN  (WHITESPACE)* numval (WHITESPACE)* (COMMA (WHITESPACE)* numval)* (WHITESPACE)* RPAREN;
+listoffloats: LPAREN  (WHITESPACE)* floatval (WHITESPACE)* (COMMA (WHITESPACE)* floatval)* (WHITESPACE)* RPAREN;
+listofdoubles: LPAREN  (WHITESPACE)* doubleval (WHITESPACE)* (COMMA (WHITESPACE)* doubleval)* (WHITESPACE)* RPAREN;
+listofstrings: LPAREN  (WHITESPACE)* stringval (WHITESPACE)* (COMMA (WHITESPACE)* stringval)* (WHITESPACE)* RPAREN;
+
+endstatement: SEMICOLON;
+
+SELECT: [Ss][Ee][Ll][Ee][Cc][Tt];
+
+AS: [Aa][Ss];
+
+FROM: [Ff][Rr][Oo][Mm];
+
+WHERE: [Ww][[Hh][Ee][Rr][Ee];
+
+NOT: [Nn][Oo][Tt];
+
+NULL: [Nn][Uu][Ll] [Ll];
+
+IN: [Ii][Nn];
+
+IS: [Ii][Ss];
+
+AND: [Aa][Nn][Dd];
+
+TRUE: [Tt][Rr][Uu][Ee];
+
+FALSE: [Ff][Aa][Ll][Ss][Ee];
+
+COMMA: ',';
+
+SEMICOLON: ';';
+
+EQUAL_TO: '=';
+
+GREATER_THAN: '>';
+
+LESSER_THAN: '<';
+
+GREATER_THAN_OR_EQUAL: '>=';
+
+LESSER_THAN_OR_EQUAL: '<=';
+
+LPAREN: '[';
+
+RPAREN: ']';
+
+USING: [Uu][Ss][Ii][Nn][Gg];
+
+OPTIONS: [Oo][Pp][Tt][Ii][Oo][Nn][Ss];
+
+READ_SNAPSHOT_TIME: [Rr][Ee][Aa][Dd] '_' [Ss][Nn][Aa][Pp][Ss][Hh][Oo][Tt]'_'[Tt][Ii][Mm][Ee];
+
+CONTROLTUPLE_MESSAGE: [Cc][Oo][Nn][Tt][Rr][Oo][Ll][Tt][Uu][Pp][Ll][Ee]'_'[Mm][Ee][Ss][Ss][Aa][Gg][Ee];
+
+ID: ('a'..'z' | 'A'..'Z' | '_') ('a'..'z' | 'A'..'Z' | '_' | '0'..'9')*;
+
+INT: '0' .. '9'+
+   ;
+
+WHITESPACE : ('\t' | ' ' | '\r' | '\n'| '\u000C');
+
+STRINGVAL: '"' (ESC|.)*? '"';
+
+fragment
+ESC: '\\"' | '\\\\' ;
+
+
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java
new file mode 100644
index 0000000..14ea6b7
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java
@@ -0,0 +1,1035 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultOutputPort;
+import org.apache.apex.malhar.kudu.partitioner.AbstractKuduInputPartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduOneToManyPartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduOneToOnePartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionConsistentOrderScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionRandomOrderScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.scanner.KuduRecordWithMeta;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.KuduSQLParseTreeListener;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduTable;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>Abstract Kudu Input Operator that can be used to stream POJO representation of a Kudu table row by providing a
+ *  SQL expression as an input to the operator. The SQL expression is fed into the operator by implementing the
+ *   {@link AbstractKuduInputOperator#getNextQuery()}. The query processing is a continuous process wherein the
+ *    method is invoked as soon as the operator is done processing the current query</p>
+ *
+ * <p>
+ *   The following are the main features of  the Kudu input operator
+ *   <ol>
+ *     <li>The scans are performed using a SQL expression. The SQL expression is limited to the constructs
+ *      provided by the Kudu scan engine. As an example not equal to expressions are not allowed as predicates</li>
+ *     <li>The SQL expression allows for scanning with options that allow for setting the read snap shot time
+ *      and/or setting the message that will be sent as part of the control tuple after the query is sent to the
+ *      downstream operators</li>
+ *     <li>The operator allows for a custom control tuple to be sent when the query data is streamed to the downstream
+ *      operators</li>
+ *     <li>The operator is templatised.
+ *       <ol>
+ *         <li>The first template variable represents the POJO class that represents the single
+ *      row of a kudu table</li>
+ *        <li>The second template parameter represents the control tuple message that is sent to downstream operators
+ *         once a query is complete.</li>
+ *       </ol>
+ *     </li>
+ *     <li>
+ *       The input operator can be configured as a fault tolerant input operator. Configuring for fault tolerance
+ *       implies that the scan is slower but resilient for tablet server failures. See
+ *       {@link AbstractKuduInputOperator#setFaultTolerantScanner(boolean)}</li>
+ *     </li>
+ *     <li>The number of partitions can be set via typical apex configuration or via the method
+ *     {@link AbstractKuduInputOperator#setNumberOfPartitions(int)}. The configuration value is overridden by the
+ *     API method if both are defined.</li>
+ *     <li>
+ *       A partition scan strategy can be configured. There are two types os supported scan partitioners.Default is
+ *       MANY_TABLETS_PER_OPERATOR.
+ *       <ol>
+ *         <li>MANY_TABLETS_PER_OPERATOR: Many kudu tablets are mapped to one Apex parition while scanning. See
+ *          {@link KuduOneToManyPartitioner}</li>
+ *         <li>ONE_TABLET_PER_OPERATOR: One Kudu tablet to one Apex partition. See
+ *          {@link KuduOneToOnePartitioner}</li>
+ *       </ol>
+ *     </li>
+ *     <li>A scan order strategy can also be set. This configuration allows for two types of ordering. The default
+ *     is RANDOM_ORDER_SCANNER.
+ *      <ol>
+ *        <li>CONSISTENT_ORDER_SCANNER: Ensures the scans are consistent across checkpoint and restarts. See
+ *         {@link KuduPartitionConsistentOrderScanner}</li>
+ *        <li>RANDOM_ORDER_SCANNER: Rows from different kudu tablets are streamed in parallel in different threads.
+ *         Choose this option for the highest throughput but with the downside of no ordering guarantees across
+ *          multiple launches of the operator. See {@link KuduPartitionRandomOrderScanner}</li>
+ *      </ol>
+ *     </li>
+ *     <li>Reads from Kudu tablets happen on a different thread than the main operator thread. The operator uses the
+ *      DisruptorBlockingQueue as the buffer to achieve optimal performance and high throughput. The data scanned is
+ *      sent via this buffer to the kudu input operator. Depending on the configuration, there can be multiple threads
+ *       scanning multiple kudu tablets in parallel. The input operator allows to fine tune the buffering strategy.
+ *        Set the CPU policy to SPINNING to
+ *     get the highest performance. However this setting results in showing a 100% CPU busy state on the host where
+ *      it is spinning instead of doing any useful work until there is data processing required</li>
+ *     <li>The Operator allows for exactly once semantics when resuming from a checkpoint after a crash recovery/
+ *     restart. For the exactly once semantics, the operator developer who is extending the Abstract input
+ *      operator will need to override the method
+ *      {@link AbstractKuduInputOperator#isAllowedInReconcilingWindow(KuduRecordWithMeta)}. Return true if the
+ *      row needs to be streamed downstream. Note that exactly once semantics requires a lot more
+ *       complexity in a truly distributed framework. Note that this method is called only for one window when
+ *        the operator is in a reconciling mode in the last window that it was operating in just before a crash
+ *         or a shutdown of the application.</li>
+ *     <li>The query processing is done by operator one query after another and is independent of the checkpoint
+ *      boundary. This essentially means that any given instance of time all physical instances of the operator
+ *       are not processing the same query. Downstream operators if using a unifier need to be aware of this. The
+ *        control tuple sent downstream can be used to identify the boundaries of the operator query processing
+ *         boundaries.
+ *     {@link AbstractKuduInputOperator#getNextQuery()} method is used by the child classes of this operator to
+ *      define the next SQL expression that needs to be processed.</li>
+ *     <li>Connection to the Kudu cluster is managed by specifying a
+ *      {@link ApexKuduConnection.ApexKuduConnectionBuilder)}. Use this builder to fine tune various connection configs
+ *       to ensure that the optimal resources are set for the scanner</li>
+ *     <li>The tuple throughput can be controlled by using the
+ *      {@link AbstractKuduInputOperator#setMaxTuplesPerWindow(int)} to control the rate at which the
+ *       tuples are emitted in any given window</li>
+ *     <li>Note that the default window data manager is the FS based window data manger. Changing this to Noop or
+ *      other dummy implementations would result in 'EXACTLY ONCE' semantics void.</li>
+ *   </ol>
+ * </p>
+ * @since 3.8.0
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractKuduInputOperator<T,C extends InputOperatorControlTuple> implements InputOperator,
+    Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
+    Partitioner<AbstractKuduInputOperator>, StatsListener
+{
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduInputOperator.class);
+
+  private KuduPartitionScanStrategy partitionScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+
+  private KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+  protected ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionInfo;
+
+  /** Represents the PIE of the total tablets that are distributed across all instances of the operator. Note that
+   *  this <b>DOES NOT</b> represent the work distribution for a given query for reasons related to churn of
+   *   partitioning. Any query that is processed is assigned is alloted a subset of this pie with the possibility of
+   *   all the query utilizing all of the pie for computing the query.
+  */
+  private List<KuduPartitionScanAssignmentMeta> partitionPieAssignment = new ArrayList<>();
+
+  protected Class<T> clazzForResultObject;
+
+  private int numberOfPartitions = -1; // set to -1 to see in partition logic to see if user explicitly set this value
+
+  protected String tableName;
+
+  // performance fine tuning params
+
+  /**
+   *  Set this to false to increase throughput but at the risk of being Kudu tablet failure dependent
+   */
+  private boolean isFaultTolerantScanner = true;
+
+  private SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+  private int bufferCapacity = 8192;
+
+  private int maxTuplesPerWindow = -1;
+
+
+  // current query fields
+  private String currentQueryBeingProcessed;
+
+  private Map<String,String> optionsEnabledForCurrentQuery;
+
+  /**
+   * Used to track the number of scantokens that are valid for this operator instance and this query. Used
+   * to track the current query processing status and whether the operator is ready to accept the next query.
+   */
+  private int plannedSegmentsForCurrentQuery = 0;
+
+  /**
+   * Used to track the completion status of each scantoken Kudu partition that was assigned for this query and this
+   * physical instance of the operator.
+   */
+  private Map<KuduPartitionScanAssignmentMeta,Boolean> currentQueryCompletionStatus = new HashMap<>();
+
+  private boolean allScansCompleteForThisQuery = true;
+
+  private boolean isPartitioned = false;
+
+  @NotNull
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+
+  private transient long currentWindowId;
+
+  private transient long reconcilingPhaseWindowId;
+
+  private transient boolean isCurrentlyInSafeMode;
+
+  private transient boolean isCurrentlyInReconcilingMode;
+
+  private transient Map<KuduPartitionScanAssignmentMeta,Long> windowManagerDataForScans;
+
+  @JsonIgnore
+  protected transient AbstractKuduInputPartitioner partitioner;
+
+  @JsonIgnore
+  protected transient AbstractKuduPartitionScanner<T,C> scanner;
+
+  private transient DisruptorBlockingQueue<KuduRecordWithMeta<T>> buffer;
+
+  private transient Map<String,String> kuduColNameToPOJOFieldNameMap;
+
+  private transient  Map<String,ColumnSchema> kuduColNameToSchemaMapping;
+
+  private transient int currentWindowTupleCount = 0;
+
+  public final transient ControlAwareDefaultOutputPort<T> outputPort = new ControlAwareDefaultOutputPort<>();
+
+  public final transient  DefaultOutputPort<String> errorPort = new DefaultOutputPort<>();
+
+  public AbstractKuduInputOperator()
+  {
+    // Used by the Kryo cloning process.
+    // Other usages would need to set the table name, class and Kudu connection using setters.
+    // Kryo would take care of these values as it does a deep copy.
+  }
+
+  public AbstractKuduInputOperator(ApexKuduConnection.ApexKuduConnectionBuilder kuduConnectionInfo,
+      Class<T> clazzForPOJO) throws Exception
+  {
+    checkNotNull(clazzForPOJO," Class definition for POJO cannot be null");
+    checkNotNull(kuduConnectionInfo, "Kudu connection info cannot be null");
+    apexKuduConnectionInfo = kuduConnectionInfo;
+    clazzForResultObject = clazzForPOJO;
+    tableName = kuduConnectionInfo.tableName;
+  }
+
+  /**
+   * Implement this method to get the next query that needs to be processed by this instance of the operator. Note
+   * that the given query is distributed across all physical instances of the operator. It is entirely possible that
+   * each physical instance of the operator is processing a different query at any given instance of time because
+   * the rows that need to be scanned might not be equally spread across all kudu partitions.
+   * As long as all instances of the physical operator get the same sequence of queries as part of the implementation
+   *  of this method, all the physical operator instances will ensure the same sequence of processing of the queries.
+   * @return The next query string that needs to be processed.
+   */
+  protected abstract String getNextQuery();
+
+
+  /**
+  * Implements  the logic to process a given query. Note that this method is invoked from two states.
+   *  One state when the Operator is resuming from a checkpointed state and the second when a new query needs to be
+   *   processed as part of the normal execution processing.
+  */
+  protected boolean processForQueryString(String queryExpression)
+  {
+    LOG.info("Processing query " + queryExpression);
+    SQLToKuduPredicatesTranslator parsedQuery = null;
+    try {
+      parsedQuery = new SQLToKuduPredicatesTranslator(
+        queryExpression,new ArrayList<ColumnSchema>(kuduColNameToSchemaMapping.values()));
+      parsedQuery.parseKuduExpression();
+    } catch (Exception e) {
+      LOG.error("Could not parse the SQL expression/query " + e.getMessage(),e);
+      errorPort.emit(queryExpression);
+      return false;
+    }
+    if (parsedQuery.getErrorListener().isSyntaxError()) {
+      LOG.error(" Query is an invalid query " + queryExpression);
+      errorPort.emit(queryExpression);
+      return false;
+    }
+    if (!parsedQuery.getKuduSQLParseTreeListener().isSuccessfullyParsed()) {
+      LOG.error(" Query could not be successfully parsed instead of being syntactically correct " + queryExpression);
+      errorPort.emit(queryExpression);
+      return false;
+    }
+    Map<String,Object> setters = extractSettersForResultObject(parsedQuery);
+    try {
+      currentQueryBeingProcessed = queryExpression;
+      optionsEnabledForCurrentQuery.clear();
+      optionsEnabledForCurrentQuery.putAll(parsedQuery.getKuduSQLParseTreeListener().getOptionsUsed());
+      plannedSegmentsForCurrentQuery = scanner.scanAllRecords(parsedQuery,setters);
+    } catch (IOException e) {
+      LOG.error("Error while scanning the kudu segments " + e.getMessage(), e);
+      errorPort.emit(queryExpression);
+      return false;
+    }
+    LOG.info("Query" + queryExpression + " submitted for scanning");
+    return true;
+  }
+
+  /**
+  * Resets the processing state of data structures that are applicable at a query level.
+  */
+  protected boolean processForNextQuery()
+  {
+    LOG.info("Clearing data structures for processing query " + currentQueryBeingProcessed);
+    windowManagerDataForScans.clear();
+    currentQueryCompletionStatus.clear();
+    return processForQueryString(getNextQuery());
+  }
+
+
+  /**
+   * Scans the metadata for the kudu table that this operator is scanning for and
+   * returns back the mapping for the kudu column name to the ColumnSchema metadata definition.
+   * Note that the Kudu columns names are case sensitive.
+   * @return A Map with Kudu column names as keys and value as the Column Definition.
+   * @throws Exception
+   */
+  private Map<String,ColumnSchema> buildColumnSchemaForTable() throws Exception
+  {
+    if (kuduColNameToSchemaMapping == null) {
+      ApexKuduConnection connectionForMetaDataScan = apexKuduConnectionInfo.build();
+      KuduTable table = connectionForMetaDataScan.getKuduTable();
+      List<ColumnSchema> tableColumns =  table.getSchema().getColumns();
+      connectionForMetaDataScan.close();
+      Map<String,ColumnSchema> columnSchemaMap = new HashMap<>();
+      for (ColumnSchema aColumn: tableColumns) {
+        columnSchemaMap.put(aColumn.getName(),aColumn);
+      }
+      kuduColNameToSchemaMapping = columnSchemaMap;
+    }
+    return kuduColNameToSchemaMapping;
+  }
+
+  /**
+  * Method responsible for sending a control tuple whenever a new query is about to be processed.
+  */
+  protected void sendControlTupleForNewQuery()
+  {
+    C startNewQueryControlTuple = getControlTupleForNewQuery();
+    if (startNewQueryControlTuple != null) {
+      outputPort.emitControl(startNewQueryControlTuple);
+    }
+  }
+
+  /**
+   * Override this method to send a valid Control tuple to the downstream operator instances
+   * @return The control tuple that needs to be sent at the beginning of the query
+   */
+  protected C getControlTupleForNewQuery()
+  {
+    return null;// default is null; Can be overridden in the child classes if custom representation is needed
+  }
+
+  /**
+   * Fetches values form the buffer. This also checks for the limit set as the maximum tuples that can be
+   * sent downstream in a single window.
+   * @return Returns the next Kudu row along with its metadata that needs to be processed for filters
+   */
+
+  protected KuduRecordWithMeta<T> processNextTuple()
+  {
+    boolean emitTuple = true;
+    KuduRecordWithMeta<T> entryFetchedFromBuffer = null;
+    if (maxTuplesPerWindow != -1) {
+      // we have an explicit upper window. Hence we need to check and then emit
+      if (currentWindowTupleCount >= maxTuplesPerWindow) {
+        emitTuple = false;
+      }
+    }
+    if ( emitTuple) {
+      try {
+        entryFetchedFromBuffer = buffer.poll(100L, TimeUnit.MICROSECONDS);
+      } catch (InterruptedException e) {
+        LOG.debug("No entry available in the buffer " + e.getMessage(), e);
+      }
+    }
+    return entryFetchedFromBuffer;
+  }
+
+  /***
+   * Used to filter of tuples read from the Kudu scan if it is being replayed in the windows before the
+   * reconciling window phase. THis method also invokes the filter check in case the current window is in the
+   *  reconciling window phase
+   * @param recordWithMeta
+   */
+  protected void filterTupleBasedOnCurrentState(KuduRecordWithMeta<T> recordWithMeta)
+  {
+    boolean filter = false;
+    if (isCurrentlyInSafeMode) {
+      filter = true;
+    }
+    KuduPartitionScanAssignmentMeta currentRecordMeta = recordWithMeta.getTabletMetadata();
+    long currentPositionInScan = recordWithMeta.getPositionInScan();
+    if (windowManagerDataForScans.containsKey(currentRecordMeta)) {
+      long counterLimitForThisMeta = windowManagerDataForScans.get(currentRecordMeta);
+      if ( currentPositionInScan <= counterLimitForThisMeta) {
+        // This is the case of a replay and hence do not emit
+        filter = true;
+      } else {
+        windowManagerDataForScans.put(currentRecordMeta,currentPositionInScan);
+      }
+    } else {
+      windowManagerDataForScans.put(currentRecordMeta,currentPositionInScan);
+    }
+    if ( isCurrentlyInReconcilingMode) {
+      // check to see if we can emit based on the buisness logic in a reconciling window processing state
+      if ( !isAllowedInReconcilingWindow(recordWithMeta)) {
+        filter = true;
+      }
+    }
+    if (!filter) {
+      outputPort.emit(recordWithMeta.getThePayload());
+      currentWindowTupleCount += 1;
+    }
+  }
+
+  /**
+   * Sends either control tuples or data tuples basing on the values being fetched from the buffer.
+   */
+  @Override
+  public void emitTuples()
+  {
+    if (allScansCompleteForThisQuery) {
+      if (processForNextQuery() ) {
+        sendControlTupleForNewQuery();
+        allScansCompleteForThisQuery = false;
+      }
+      return;
+    }
+    KuduRecordWithMeta<T> entryFetchedFromBuffer = processNextTuple();
+    if (entryFetchedFromBuffer != null) {
+      if ( (!entryFetchedFromBuffer.isEndOfScanMarker()) &&
+          (!entryFetchedFromBuffer.isBeginScanMarker())) {
+        // we have a valid record
+        filterTupleBasedOnCurrentState(entryFetchedFromBuffer);
+        return;
+      }
+      if (entryFetchedFromBuffer.isEndOfScanMarker()) {
+        processForEndScanMarker(entryFetchedFromBuffer);
+        sendControlTupleForEndQuery();
+      }
+      if (entryFetchedFromBuffer.isBeginScanMarker()) {
+        processForBeginScanMarker(entryFetchedFromBuffer);
+      }
+    }
+  }
+
+  /**
+   * Sends a control tuple to the downstream operators. This method checks to see if the user wishes to send a custom
+   *  control tuple. If yes, a control tuple that the user chooses to send is streamed to the downstream operators.
+   *  See {@link AbstractKuduInputOperator#getControlTupleForEndQuery()}. If this is null but the user configured a
+   *  control tuple to be sent as part of the SQL expression that is given as input, a control tuple will automatically
+   *  be generated by this operator containing the message that the user set at the time of the Query definition.
+   */
+  protected void sendControlTupleForEndQuery()
+  {
+    if ( currentQueryBeingProcessed == null) {
+      return; //
+    }
+    if (!allScansCompleteForThisQuery) {
+      return;
+    }
+    C endControlTuple = getControlTupleForEndQuery();
+    if (endControlTuple != null) {
+      outputPort.emitControl(endControlTuple);
+    } else {
+      if (optionsEnabledForCurrentQuery.containsKey(KuduSQLParseTreeListener.CONTROLTUPLE_MESSAGE)) {
+        InputOperatorControlTuple controlTuple = new InputOperatorControlTuple();
+        controlTuple.setBeginNewQuery(false);
+        controlTuple.setEndCurrentQuery(true);
+        controlTuple.setQuery(currentQueryBeingProcessed);
+        controlTuple.setControlMessage(
+            optionsEnabledForCurrentQuery.get(KuduSQLParseTreeListener.CONTROLTUPLE_MESSAGE));
+        outputPort.emitControl(controlTuple);
+      }
+    }
+  }
+
+  protected C getControlTupleForEndQuery()
+  {
+    return null;// default is null; Can be overridden in the child classes if custom representation is needed
+  }
+
+  protected void processForBeginScanMarker(KuduRecordWithMeta<T> entryFetchedFromBuffer)
+  {
+    currentQueryCompletionStatus.put(entryFetchedFromBuffer.getTabletMetadata(), false);
+  }
+
+  /**
+   * Used to see if all the segments that are planned for the current query and the current physical instance of the
+   * operator are done in terms of streaming tuples to the downstream operators.
+   * @param entryFetchedFromBuffer
+   */
+  protected void processForEndScanMarker(KuduRecordWithMeta<T> entryFetchedFromBuffer)
+  {
+    Boolean currentStatus = currentQueryCompletionStatus.get(entryFetchedFromBuffer.getTabletMetadata());
+    if (currentStatus == null) {
+      LOG.error(" End scan marker cannot be precede a Begin Scan marker ");
+    }
+    currentQueryCompletionStatus.put(entryFetchedFromBuffer.getTabletMetadata(), true);
+    if ( plannedSegmentsForCurrentQuery == 0 ) {
+      allScansCompleteForThisQuery = true;
+      return;
+    }
+    boolean areAllScansComplete  = true;
+    if (currentQueryCompletionStatus.size() != plannedSegmentsForCurrentQuery) {
+      return;
+    }
+    for (KuduPartitionScanAssignmentMeta aMeta: currentQueryCompletionStatus.keySet()) {
+      if (!currentQueryCompletionStatus.get(aMeta)) {
+        areAllScansComplete = false;
+      }
+    }
+    if (areAllScansComplete) {
+      allScansCompleteForThisQuery = true;
+    }
+  }
+
+  /**
+   * This method is used to give control the user of the operator to decide how to derive exactly once
+   * semantics.  Note that this check is invoked in a reconciling window when the operator is resuming from a crash.
+   * This check happens only in this window and in no other window.
+   * @param extractedRecord
+   * @return true if the tuple needs to be sent to downstream. False if one does not want to stream this tuple
+   *  downstream. Note that this check is invoked in a reconciling window when the operator is resuming from a crash.
+   */
+  protected boolean isAllowedInReconcilingWindow(KuduRecordWithMeta<T> extractedRecord)
+  {
+    return true;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowTupleCount = 0;
+    currentWindowId = windowId;
+    if ( currentWindowId != Stateless.WINDOW_ID) {
+      if (currentWindowId > reconcilingPhaseWindowId) {
+        isCurrentlyInSafeMode = false;
+        isCurrentlyInReconcilingMode = false;
+      }
+      if (currentWindowId == reconcilingPhaseWindowId) {
+        isCurrentlyInReconcilingMode = true;
+        isCurrentlyInSafeMode = false;
+      }
+      if (currentWindowId < reconcilingPhaseWindowId) {
+        isCurrentlyInReconcilingMode = false;
+        isCurrentlyInSafeMode = true;
+      }
+    }
+    LOG.info(" Current processing mode states Safe Mode = " + isCurrentlyInSafeMode + " Reconciling mode = "
+        + isCurrentlyInReconcilingMode);
+    LOG.info(" Current window ID = " + currentWindowId + " reconciling window ID = " + reconcilingPhaseWindowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    try {
+      windowDataManager.save(windowManagerDataForScans,currentWindowId);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not persist the window Data manager on end window boundary " +
+          e.getMessage(),e);
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (windowDataManager != null) {
+      windowDataManager.setup(context);
+    }
+    try {
+      buildColumnSchemaForTable();
+    } catch (Exception e) {
+      throw new RuntimeException("Error while trying to build the schema definition for the Kudu table " + tableName,
+          e);
+    }
+    windowManagerDataForScans = new HashMap<>();
+    optionsEnabledForCurrentQuery = new HashMap<>();
+    initPartitioner();
+    initBuffer();
+    // Scanner can only be initialized after initializing the partitioner
+    initScanner(); // note that this is not streaming any data yet. Only warming up the scanner ready to read data
+    initCurrentState();
+    isCurrentlyInSafeMode = false;
+    isCurrentlyInReconcilingMode = false;
+    reconcilingPhaseWindowId = Stateless.WINDOW_ID;
+    if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+        context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+        windowDataManager.getLargestCompletedWindow()) {
+      isCurrentlyInSafeMode = true;
+      reconcilingPhaseWindowId = windowDataManager.getLargestCompletedWindow() + 1;
+      LOG.info("Set reconciling window ID as " + reconcilingPhaseWindowId);
+      isCurrentlyInReconcilingMode = false;
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    scanner.close();
+  }
+
+  /**
+   * Used to read an existing state from a window data manager and initialize the starting state of this operator. Note
+   * that there might be a query that was partially processed before the checkpointing process or the crash was
+   *  triggered.
+   */
+  private void initCurrentState()
+  {
+    Map<KuduPartitionScanAssignmentMeta,Long> savedState = null;
+    if ( (windowManagerDataForScans.size() == 0) && (currentQueryBeingProcessed == null) ) {
+      // This is the case of an application restart possibly and hence want to get the exact state
+      try {
+        savedState = (Map<KuduPartitionScanAssignmentMeta,Long>)windowDataManager.retrieve(
+          windowDataManager.getLargestCompletedWindow());
+      } catch (IOException e) {
+        throw new RuntimeException("Error while retrieving the window manager data at the initialization phase", e);
+      } catch ( NullPointerException ex) {
+        LOG.error("Error while getting the window manager data ", ex);
+      }
+    }
+    if ( ( savedState != null) && (savedState.size() > 0 ) ) {
+      KuduPartitionScanAssignmentMeta aMeta = savedState.keySet().iterator().next(); // we have one atleast
+      currentQueryBeingProcessed = aMeta.getCurrentQuery();
+      allScansCompleteForThisQuery = false;
+      windowManagerDataForScans.putAll(savedState);
+      processForQueryString(currentQueryBeingProcessed);
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractKuduInputOperator>> definePartitions(Collection collection,
+      PartitioningContext context)
+  {
+    initPartitioner();
+    return partitioner.definePartitions(collection,context);
+  }
+
+  @Override
+  public void partitioned(Map partitions)
+  {
+    initPartitioner();
+    partitioner.partitioned(partitions);
+  }
+
+  @Override
+  public Response processStats(BatchedOperatorStats stats)
+  {
+    Response response = new Response();
+    if ( ( partitionPieAssignment == null) || (partitionPieAssignment.size() == 0) ) {
+      response.repartitionRequired = true;
+    } else {
+      response.repartitionRequired = false;
+    }
+    return response;
+  }
+
+  private void initBuffer()
+  {
+    buffer = new DisruptorBlockingQueue<KuduRecordWithMeta<T>>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+  }
+
+  private void initPartitioner()
+  {
+    if (partitioner != null) {
+      return;
+    }
+    checkNotNull(apexKuduConnectionInfo,"Apex Kudu connection cannot be null while setting partitioner");
+    switch (partitionScanStrategy) {
+      case MANY_TABLETS_PER_OPERATOR:
+        partitioner = new KuduOneToManyPartitioner(this);
+        break;
+      case ONE_TABLET_PER_OPERATOR:
+      default:
+        partitioner = new KuduOneToOnePartitioner(this);
+        break;
+
+    }
+  }
+
+  private void initScanner()
+  {
+    switch (scanOrderStrategy) {
+      case CONSISTENT_ORDER_SCANNER:
+        setFaultTolerantScanner(true); // Consistent order scanners require them to be fault tolerant
+        scanner = new KuduPartitionConsistentOrderScanner<>(this);
+        break;
+      case RANDOM_ORDER_SCANNER:
+      default:
+        scanner = new KuduPartitionRandomOrderScanner(this);
+        break;
+    }
+  }
+
+  /***
+   *
+   * @param parsedQuery The parsed query string
+   * @return Null if the SQL expression cannot be mapped properly to the POJO fields in which case the SQL string is
+   *  sent to the Error port
+   */
+  public Map<String,Object> extractSettersForResultObject(SQLToKuduPredicatesTranslator parsedQuery)
+  {
+    Map<String,String> aliasesUsedForThisQuery = parsedQuery.getKuduSQLParseTreeListener().getAliases();
+    Map<String,Object> setterMap = new HashMap<>();
+    Field[] fieldsOfThisPojo = clazzForResultObject.getDeclaredFields();
+    Set<String> allPojoFieldNamesUsed = new HashSet<>(aliasesUsedForThisQuery.values());
+    for ( Field aField : fieldsOfThisPojo) {
+      if (!allPojoFieldNamesUsed.contains(aField.getName())) {
+        LOG.error("Invalid mapping fo Kudu table column name to the POJO field name " + aField.getName());
+        return null; // SQL expression will be sent to the error port
+      }
+    }
+    for (ColumnSchema aKuduTableColumn: kuduColNameToSchemaMapping.values()) {
+      String kuduColumnName = aKuduTableColumn.getName();
+      switch ( aKuduTableColumn.getType().getDataType().getNumber()) {
+        case Common.DataType.BINARY_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetter(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName),ByteBuffer.class));
+          break;
+        case Common.DataType.BOOL_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterBoolean(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.DOUBLE_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterDouble(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.FLOAT_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterFloat(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.INT8_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterByte(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.INT16_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterShort(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.INT32_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterInt(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.UNIXTIME_MICROS_VALUE:
+        case Common.DataType.INT64_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetterLong(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName)));
+          break;
+        case Common.DataType.STRING_VALUE:
+          setterMap.put(kuduColumnName,PojoUtils.createSetter(clazzForResultObject,
+              aliasesUsedForThisQuery.get(kuduColumnName),String.class));
+          break;
+        case Common.DataType.UINT8_VALUE:
+          LOG.error("Unsigned int 8 not supported yet");
+          throw new RuntimeException("uint8 not supported in Kudu schema yet");
+        case Common.DataType.UINT16_VALUE:
+          LOG.error("Unsigned int 16 not supported yet");
+          throw new RuntimeException("uint16 not supported in Kudu schema yet");
+        case Common.DataType.UINT32_VALUE:
+          LOG.error("Unsigned int 32 not supported yet");
+          throw new RuntimeException("uint32 not supported in Kudu schema yet");
+        case Common.DataType.UINT64_VALUE:
+          LOG.error("Unsigned int 64 not supported yet");
+          throw new RuntimeException("uint64 not supported in Kudu schema yet");
+        case Common.DataType.UNKNOWN_DATA_VALUE:
+          LOG.error("unknown data type ( complex types ? )  not supported yet");
+          throw new RuntimeException("Unknown data type  ( complex types ? ) not supported in Kudu schema yet");
+        default:
+          LOG.error("unknown type/default  ( complex types ? )  not supported yet");
+          throw new RuntimeException("Unknown type/default  ( complex types ? ) not supported in Kudu schema yet");
+      }
+    }
+    return setterMap;
+  }
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+
+  }
+
+  public ApexKuduConnection.ApexKuduConnectionBuilder getApexKuduConnectionInfo()
+  {
+    return apexKuduConnectionInfo;
+  }
+
+  public void setApexKuduConnectionInfo(ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnection)
+  {
+    apexKuduConnectionInfo = apexKuduConnection;
+  }
+
+  public List<KuduPartitionScanAssignmentMeta> getPartitionPieAssignment()
+  {
+    return partitionPieAssignment;
+  }
+
+  public void setPartitionPieAssignment(List<KuduPartitionScanAssignmentMeta> partitionPieAssignment)
+  {
+    this.partitionPieAssignment = partitionPieAssignment;
+  }
+
+  public KuduPartitionScanStrategy getPartitionScanStrategy()
+  {
+    return partitionScanStrategy;
+  }
+
+  public void setPartitionScanStrategy(KuduPartitionScanStrategy partitionScanStrategy)
+  {
+    this.partitionScanStrategy = partitionScanStrategy;
+  }
+
+  public String getCurrentQueryBeingProcessed()
+  {
+    return currentQueryBeingProcessed;
+  }
+
+  public void setCurrentQueryBeingProcessed(String currentQueryBeingProcessed)
+  {
+    this.currentQueryBeingProcessed = currentQueryBeingProcessed;
+  }
+
+  public int getNumberOfPartitions()
+  {
+    return numberOfPartitions;
+  }
+
+  public void setNumberOfPartitions(int numberOfPartitions)
+  {
+    this.numberOfPartitions = numberOfPartitions;
+  }
+
+  public KuduScanOrderStrategy getScanOrderStrategy()
+  {
+    return scanOrderStrategy;
+  }
+
+  public void setScanOrderStrategy(KuduScanOrderStrategy scanOrderStrategy)
+  {
+    this.scanOrderStrategy = scanOrderStrategy;
+  }
+
+  public AbstractKuduInputPartitioner getPartitioner()
+  {
+    return partitioner;
+  }
+
+  public void setPartitioner(AbstractKuduInputPartitioner partitioner)
+  {
+    this.partitioner = partitioner;
+  }
+
+  public AbstractKuduPartitionScanner<T,C> getScanner()
+  {
+    return scanner;
+  }
+
+  public void setScanner(AbstractKuduPartitionScanner<T,C> scanner)
+  {
+    this.scanner = scanner;
+  }
+
+  public Class<T> getClazzForResultObject()
+  {
+    return clazzForResultObject;
+  }
+
+  public void setClazzForResultObject(Class<T> clazzForResultObject)
+  {
+    this.clazzForResultObject = clazzForResultObject;
+  }
+
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+  {
+    return cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+  {
+    this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public int getBufferCapacity()
+  {
+    return bufferCapacity;
+  }
+
+  public void setBufferCapacity(int bufferCapacity)
+  {
+    this.bufferCapacity = bufferCapacity;
+  }
+
+  public Map<String, String> getKuduColNameToPOJOFieldNameMap()
+  {
+    return kuduColNameToPOJOFieldNameMap;
+  }
+
+  public void setKuduColNameToPOJOFieldNameMap(Map<String, String> kuduColNameToPOJOFieldNameMap)
+  {
+    this.kuduColNameToPOJOFieldNameMap = kuduColNameToPOJOFieldNameMap;
+  }
+
+  public Map<String, ColumnSchema> getKuduColNameToSchemaMapping()
+  {
+    return kuduColNameToSchemaMapping;
+  }
+
+  public void setKuduColNameToSchemaMapping(Map<String, ColumnSchema> kuduColNameToSchemaMapping)
+  {
+    this.kuduColNameToSchemaMapping = kuduColNameToSchemaMapping;
+  }
+
+  public int getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  /**
+   * Note that it is recommended that the buffer capacity be in the powers of two
+   * @param buffer
+   */
+  public void setBuffer(DisruptorBlockingQueue<KuduRecordWithMeta<T>> buffer)
+  {
+    this.buffer = buffer;
+  }
+
+  public DisruptorBlockingQueue<KuduRecordWithMeta<T>> getBuffer()
+  {
+    return buffer;
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  public boolean isFaultTolerantScanner()
+  {
+    return isFaultTolerantScanner;
+  }
+
+  public void setFaultTolerantScanner(boolean faultTolerantScanner)
+  {
+    isFaultTolerantScanner = faultTolerantScanner;
+  }
+
+  public Map<String, String> getOptionsEnabledForCurrentQuery()
+  {
+    return optionsEnabledForCurrentQuery;
+  }
+
+  public void setOptionsEnabledForCurrentQuery(Map<String, String> optionsEnabledForCurrentQuery)
+  {
+    this.optionsEnabledForCurrentQuery = optionsEnabledForCurrentQuery;
+  }
+
+  public boolean isPartitioned()
+  {
+    return isPartitioned;
+  }
+
+  public void setPartitioned(boolean partitioned)
+  {
+    isPartitioned = partitioned;
+  }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
similarity index 97%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
index ff668b0..b171d52 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -31,6 +31,7 @@ import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -48,6 +49,7 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.Statistics;
 import org.apache.kudu.client.Update;
 import org.apache.kudu.client.Upsert;
+
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
@@ -338,8 +340,7 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
     try {
       kuduSession.apply(currentOperation);
     } catch (KuduException e) {
-      LOG.error("Could not execute operation because " + e.getMessage(), e);
-      throw new RuntimeException(e.getMessage());
+      throw new RuntimeException("Could not execute operation because " + e.getMessage(), e);
     }
   }
 
@@ -443,8 +444,7 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
     try {
       windowDataManager.committed(windowId);
     } catch (IOException e) {
-      LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage());
-      throw new RuntimeException(e);
+      throw new RuntimeException("Error while committing the window id " + windowId + " because " + e.getMessage(), e );
     }
   }
 
@@ -496,15 +496,14 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
     try {
       kuduSession.flush();
     } catch (KuduException e) {
-      LOG.error("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
-      throw new RuntimeException(e);
+      throw new RuntimeException("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
     }
     if ( currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       try {
         windowDataManager.save(currentWindowId,currentWindowId);
       } catch (IOException e) {
-        LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage());
-        throw new RuntimeException(e);
+        throw new RuntimeException("Error while persisting the current window state " + currentWindowId +
+            " because " + e.getMessage(), e);
       }
     }
     numOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) -
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
similarity index 90%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
index 99eaf1b..4dddf82 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -34,7 +34,6 @@ import org.apache.kudu.client.SessionConfiguration;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-
 /**
  * <p>Represents a connection to the Kudu cluster. An instance of this class is to be supplied (via a builder pattern to)
  * {@link AbstractKuduOutputOperator} to connect to a Kudu cluster.</p>
@@ -52,12 +51,14 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
 
   public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class);
 
+  private ApexKuduConnectionBuilder builderForThisConnection;
 
   private ApexKuduConnection(ApexKuduConnectionBuilder builder)
   {
     checkNotNull(builder,"Builder cannot be null to establish kudu session");
     checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified");
     checkNotNull(builder.tableName,"Kudu table cannot be null");
+    builderForThisConnection = builder;
     KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection);
     if (builder.isOperationTimeOutSet) {
       kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs);
@@ -86,8 +87,7 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
         kuduTable = kuduClient.openTable(builder.tableName);
       }
     } catch (Exception e) {
-      LOG.error("Kudu table existence could not be ascertained  " + e.getMessage());
-      throw new RuntimeException(e.getMessage());
+      throw new RuntimeException("Kudu table existence could not be ascertained  " + e.getMessage(), e);
     }
   }
 
@@ -128,8 +128,20 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
     this.kuduClient = kuduClient;
   }
 
-  public static class ApexKuduConnectionBuilder
+  public ApexKuduConnectionBuilder getBuilderForThisConnection()
+  {
+    return builderForThisConnection;
+  }
+
+  public void setBuilderForThisConnection(ApexKuduConnectionBuilder builderForThisConnection)
   {
+    this.builderForThisConnection = builderForThisConnection;
+  }
+
+  public static class ApexKuduConnectionBuilder implements Serializable
+  {
+    private static final long serialVersionUID = -3428649955056723311L;
+
     List<String> mastersCollection = new ArrayList<>();
 
     String tableName;
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
similarity index 96%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
index 6c7e9a6..794d7e7 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.kudu.client.ExternalConsistencyMode;
 import org.apache.kudu.client.SessionConfiguration;
 
@@ -53,7 +52,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
 {
-  public static final String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
+  public static String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
 
   public static final String TABLE_NAME = "tablename";
 
@@ -84,6 +83,7 @@ public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
     InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
     if (kuduPropsFileAsStream != null) {
       kuduConnectionProperties.load(kuduPropsFileAsStream);
+      kuduPropsFileAsStream.close();
     } else {
       throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
       " is not locatable in the root classpath");
@@ -151,7 +151,7 @@ public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
    * The default is to implement for ATLEAST_ONCE semantics. Override this control this behavior.
    * @param executionContext The tuple which represents the execution context along with the payload
    * @param reconcilingWindowId The window Id of the reconciling window
-   * @return
+   * @return True if the current row is to be written to the Kudu Store, false to be skipped
    */
   @Override
   protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java
new file mode 100644
index 0000000..a9de1d4
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>A Kudu input operator that provides the functionality of scanning a Kudu table by incrementing
+ * a column in steps as specified in the configuration. Each iteration scans the table as given by the
+ *  SQL expression in the properties file. Other aspects like connection details are also read from the properties file.
+ * </p>
+ *
+ * <p>
+ *   Here is an example property file format.
+ *   Place a file named kuduincrementalstepscaninputoperator.properties anywhere in the classpath ( say by
+ *    bundling in the resources section ). Alternately one can use the alternative constructor to specify the filename.
+ *    The properties can be set as follows:
+ * </p>
+ *
+ * <p>
+ *   masterhosts=192.168.1.41:7051,192.168.1.54:7051
+ *   tablename=kudutablename
+ *templatequerystring=SELECT * KUDUTABLE WHERE COL1 > 1234 AND TIMESTAMPCOL >= :lowerbound AND TIMESTAMPCOL< :upperbound
+ *   templatequerylowerboundparamname = :lowerbound
+ *   templatequeryupperboundparamname = :upperbound
+ *   templatequeryseedvalue = 1503001033219
+ *   templatequeryincrementalvalue = 120000
+ *
+ *   The above property file will scan a table called KUDUTABLE and only scan those rows for which a column named COL1
+ *   is greater than 1234. It also uses a column named TIMESTAMPCOL to start with the starting value of the lower
+ *   bound as 1503001033219 and increments the scan window by 2 minutes. Note that :lowerbound and :upperbound are
+ *   just template names that will be used for string substitution as the time windows progress.
+ *
+ *   Note that the implementation assumes the templated column data type is of type long. Considering other types
+ *    is a trivial extenstion and perhaps best achieved by extending the Abstract implementation.
+ * </p>
+ */
+public class IncrementalStepScanInputOperator<T,C extends InputOperatorControlTuple>
+    extends AbstractKuduInputOperator<T,C>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalStepScanInputOperator.class);
+
+  // The incremental value of the template column as read from the properties file
+  private long stepUpValue;
+
+  // The initial value of the templated column as read from the properties file
+  private long startSeedValue;
+
+  private long currentLowerBound;
+
+  private long currentUpperBound;
+
+  private String queryTemplateString;
+
+  // The name of the parameter in the templated string that represents the lower bound
+  private String lowerBoundParamName;
+
+  // The name of the parameter in the templated string that represents the upperbound
+  private String upperBoundParamName;
+
+  public static String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduincrementalstepscaninputoperator.properties";
+
+  public static final String TABLE_NAME = "tablename";
+
+  public static final String MASTER_HOSTS = "masterhosts";
+
+  public static final String POJO_CLASS_NAME = "pojoclassname";
+
+  // The name of the property in properties file that gives the query expression as a template
+  public static final String TEMPLATE_QUERY_STRING = "templatequerystring";
+
+  // The name of the property that specifies the lower bound name that will be used for incrementing
+  public static final String TEMPLATE_QUERY_LOWERBOUND_PARAMNAME = "templatequerylowerboundparamname";
+
+  // The name of the property that specifies the lower bound name that will be used for incrementing
+  public static final String TEMPLATE_QUERY_UPPERBOUND_PARAMNAME = "templatequeryupperboundparamname";
+
+  // The name of the property that specifies the value of the seed/starting value for the templated column
+  public static final String TEMPLATE_QUERY_SEED_VALUE = "templatequeryseedvalue";
+
+  // The name of the property that specifies the value of the increment for the templated column
+  public static final String TEMPLATE_QUERY_INCREMENT_STEP_VALUE = "templatequeryincrementalvalue";
+
+  private ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder;
+
+  /***
+   * To be used only by the Kryo deserialization logic and not encouraged for generic use.
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public IncrementalStepScanInputOperator() throws IOException, NumberFormatException
+  {
+    initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
+  }
+
+  public IncrementalStepScanInputOperator(Class<T> classForPojo, String configFileInClasspath)
+    throws IOException, NumberFormatException
+  {
+    initConnectionBuilderProperties(configFileInClasspath);
+    clazzForResultObject = classForPojo;
+  }
+
+  /***
+   * Instantiates all of the common configurations. Also does the initiation logic for the mandatory fields in super
+   * class
+   * @param configFileInClasspath The properties file name
+   * @throws IOException If not able to read properties file
+   * @throws NumberFormatException If not able to format string as long
+   */
+  private void initConnectionBuilderProperties(String configFileInClasspath)
+    throws IOException, NumberFormatException
+  {
+    Properties incrementalStepProperties = new Properties();
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
+    if (kuduPropsFileAsStream != null) {
+      incrementalStepProperties.load(kuduPropsFileAsStream);
+    } else {
+      if (!DEFAULT_CONNECTION_PROPS_FILE_NAME.equalsIgnoreCase(configFileInClasspath)) {
+        throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
+          " is not locatable in the root classpath");
+      } else {
+        LOG.warn("Properties file could not be loaded. Expecting the user to set properties manually");
+      }
+    }
+    tableName = checkNotNull(incrementalStepProperties.getProperty(TABLE_NAME));
+    String masterHostsConnectionString = checkNotNull(incrementalStepProperties.getProperty(MASTER_HOSTS));
+    String[] masterAndHosts = masterHostsConnectionString.split(",");
+    lowerBoundParamName = checkNotNull(incrementalStepProperties.getProperty(
+        TEMPLATE_QUERY_LOWERBOUND_PARAMNAME));
+    upperBoundParamName = checkNotNull(incrementalStepProperties.getProperty(
+        TEMPLATE_QUERY_UPPERBOUND_PARAMNAME));
+    String stepUpValueString = checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_INCREMENT_STEP_VALUE));
+    String seedValueString =  checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_SEED_VALUE));
+    queryTemplateString = checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_STRING));
+    startSeedValue = Long.valueOf(seedValueString);
+    stepUpValue = Long.valueOf(stepUpValueString);
+    currentLowerBound = startSeedValue;
+    currentUpperBound = currentLowerBound + startSeedValue;
+    initKuduConfig(tableName, Arrays.asList(masterAndHosts));
+  }
+
+  /***
+   * A simple init of the kudu connection config. Override this if you would like to fine tune hte connection parameters
+   * @param kuduTableName The Kudu table name
+   * @param kuduMasters The master hosts of the Kudu cluster.
+   */
+  public void initKuduConfig(String kuduTableName, List<String> kuduMasters)
+  {
+    apexKuduConnectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder()
+      .withTableName(kuduTableName)
+      .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+      .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+      .withNumberOfBossThreads(1)
+      .withNumberOfWorkerThreads(2)
+      .withSocketReadTimeOutAs(3000)
+      .withOperationTimeOutAs(3000);
+    for ( String aMasterAndHost: kuduMasters ) {
+      apexKuduConnectionBuilder = apexKuduConnectionBuilder.withAPossibleMasterHostAs(aMasterAndHost);
+    }
+    apexKuduConnectionInfo = apexKuduConnectionBuilder;
+  }
+
+  /***
+   * A simple template replacement logic which provides the next window of the query by replacing the
+   * lower and upper bound parameters accordingly.
+   * @return Returns the next in line query as a String that is compliant with the Kudu SQL grammar.
+   */
+  @Override
+  protected String getNextQuery()
+  {
+    long lowerBoundToUseForNextIteration = currentLowerBound;
+    long upperBoundToUseForNextIteration = currentUpperBound;
+    String queryToUse = new String(queryTemplateString);
+    queryToUse = queryToUse.replace(lowerBoundParamName,("" + lowerBoundToUseForNextIteration));
+    queryToUse = queryToUse.replace(upperBoundParamName,("" + upperBoundToUseForNextIteration));
+    currentLowerBound = currentUpperBound;
+    currentUpperBound = currentLowerBound + stepUpValue;
+    return queryToUse;
+  }
+
+  public long getStepUpValue()
+  {
+    return stepUpValue;
+  }
+
+  public void setStepUpValue(long stepUpValue)
+  {
+    this.stepUpValue = stepUpValue;
+  }
+
+  public long getStartSeedValue()
+  {
+    return startSeedValue;
+  }
+
+  public void setStartSeedValue(long startSeedValue)
+  {
+    this.startSeedValue = startSeedValue;
+    currentLowerBound = startSeedValue;
+    currentUpperBound = currentLowerBound + startSeedValue;
+  }
+
+  public String getQueryTemplateString()
+  {
+    return queryTemplateString;
+  }
+
+  public void setQueryTemplateString(String queryTemplateString)
+  {
+    this.queryTemplateString = queryTemplateString;
+  }
+
+  public String getLowerBoundParamName()
+  {
+    return lowerBoundParamName;
+  }
+
+  public void setLowerBoundParamName(String lowerBoundParamName)
+  {
+    this.lowerBoundParamName = lowerBoundParamName;
+  }
+
+  public String getUpperBoundParamName()
+  {
+    return upperBoundParamName;
+  }
+
+  public void setUpperBoundParamName(String upperBoundParamName)
+  {
+    this.upperBoundParamName = upperBoundParamName;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java
new file mode 100644
index 0000000..b591acf
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import org.apache.apex.api.operator.ControlTuple;
+
+/**
+ * A simple control tuple class that is used to represent a begin or end of a given SQL expression.
+ */
+public class InputOperatorControlTuple implements ControlTuple
+{
+  private String query;
+
+  private boolean beginNewQuery;
+
+  private boolean endCurrentQuery;
+
+  private String controlMessage;
+
+  @Override
+  public DeliveryType getDeliveryType()
+  {
+    return DeliveryType.IMMEDIATE;
+  }
+
+  public String getQuery()
+  {
+    return query;
+  }
+
+  public void setQuery(String query)
+  {
+    this.query = query;
+  }
+
+  public boolean isBeginNewQuery()
+  {
+    return beginNewQuery;
+  }
+
+  public void setBeginNewQuery(boolean beginNewQuery)
+  {
+    this.beginNewQuery = beginNewQuery;
+  }
+
+  public boolean isEndCurrentQuery()
+  {
+    return endCurrentQuery;
+  }
+
+  public void setEndCurrentQuery(boolean endCurrentQuery)
+  {
+    this.endCurrentQuery = endCurrentQuery;
+  }
+
+  public String getControlMessage()
+  {
+    return controlMessage;
+  }
+
+  public void setControlMessage(String controlMessage)
+  {
+    this.controlMessage = controlMessage;
+  }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
similarity index 98%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
index a346705..ca6b2ff 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 
 
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
similarity index 95%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
index 64b46c6..eda0ac5 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 /**
  * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
similarity index 54%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
index 64b46c6..5f54ba3 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
@@ -16,17 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * Kudu package contains the Operator implementations for both input and output. There is a base implementations for the
+ *  Output Operator. {@link org.apache.apex.malhar.kudu.BaseKuduOutputOperator}. For custom configurability use
+ *  {@link org.apache.apex.malhar.kudu.AbstractKuduOutputOperator}. Similarly there is a default implementation for the
+ *  Kudu Input operator. {@link org.apache.apex.malhar.kudu.IncrementalStepScanInputOperator}. Extend the
+ *  {@link org.apache.apex.malhar.kudu.AbstractKuduInputOperator} for configurability.
  */
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu;
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java
new file mode 100644
index 0000000..de3859b
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduTable;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * An abstract class that contains logic that is common across all partitioners available for the Kudu input operator.
+ */
+public abstract class AbstractKuduInputPartitioner implements Partitioner<AbstractKuduInputOperator>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduInputPartitioner.class);
+
+  @JsonIgnore
+  protected AbstractKuduInputOperator prototypeKuduInputOperator;
+
+  public AbstractKuduInputPartitioner(AbstractKuduInputOperator prototypeOperator)
+  {
+    prototypeKuduInputOperator = prototypeOperator;
+  }
+
+  /**
+   * Builds a set of scan tokens. The list of scan tokens are generated as if the entire table is being scanned
+   * i.e. a SELECT * FROM TABLE equivalent expression. This list is used to assign the partition pie assignments
+   * for all of the planned partition of operators. Each operator gets a part of the PIE as if all columns were
+   * selected. Subsequently when a query is to be processed, the query is used to generate the scan tokens applicable
+   * for that query. Given that partition pie represents the entire data set, the scan assignments for the current
+   * query will be a subset.
+   * @return The list of scan tokens as if the entire table is getting scanned.
+   * @throws Exception in cases when the connection to kudu cluster cannot be closed.
+   */
+  public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception
+  {
+    // We are not using the current query for deciding the partition strategy but a SELECT * as
+    // we do not want to want to optimize on just the current query. This prevents rapid throttling of operator
+    // instances when the scan patterns are erratic. On the other hand, this might result on under utilized
+    // operator resources in the DAG but will be consistent at a minimum.
+    ApexKuduConnection apexKuduConnection = prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
+    KuduClient clientHandle = apexKuduConnection.getKuduClient();
+    KuduTable table = apexKuduConnection.getKuduTable();
+    KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
+    List<String> allColumns = new ArrayList<>();
+    List<ColumnSchema> columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
+    for ( ColumnSchema column : columnList) {
+      allColumns.add(column.getName());
+    }
+    builder.setProjectedColumnNames(allColumns);
+    LOG.debug("Building the partition pie assignments for the input operator");
+    List<KuduScanToken> allPossibleTokens = builder.build();
+    apexKuduConnection.close();
+    return allPossibleTokens;
+  }
+
+  /***
+   * Returns the number of partitions. The configuration is used as the source of truth for
+   * number of partitions. The config is then overridden if the code in the client driver code explicitly sets
+   * the number of partitions.
+   * @param context The context as provided by the launcher
+   * @return The number of partitions that are planned for this input operator. At a minimum it is 1.
+   */
+  public int getNumberOfPartitions(PartitioningContext context)
+  {
+    int proposedPartitionCount = context.getParallelPartitionCount();
+    if ( prototypeKuduInputOperator.getNumberOfPartitions() != -1 ) { // -1 is the default
+      // There is a manual override of partitions from code. Use this
+      proposedPartitionCount = prototypeKuduInputOperator.getNumberOfPartitions();
+      LOG.info(" Set the partition count based on the code as opposed to configuration ");
+    }
+    if ( proposedPartitionCount <= 0)  {
+      // Parallel partitions not enabled. But the design is to use one to many mapping. Hence defaulting to one
+      LOG.info(" Defaulting to one partition as parallel partitioning is not enabled");
+      proposedPartitionCount = 1;
+    }
+    LOG.info(" Planning to use " + proposedPartitionCount + " partitions");
+    return proposedPartitionCount;
+  }
+
+  /***
+   * Builds a list of scan assignment metadata instances from raw kudu scan tokens as returned by the Kudu Query planner
+   *  assuming all of the columns and rows are to be scanned
+   * @param partitions The current set of partitions
+   * @param context The current partitioning context
+   * @return The new set of partitions
+   * @throws Exception if the Kudu connection opened for generating the scan plan cannot be closed
+   */
+  public List<KuduPartitionScanAssignmentMeta> getListOfPartitionAssignments(
+      Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context) throws Exception
+  {
+    List<KuduPartitionScanAssignmentMeta> returnList = new ArrayList<>();
+    List<KuduScanToken> allColumnsScanTokens = new ArrayList<>();
+    // we are looking at a first time invocation scenario
+    try {
+      allColumnsScanTokens.addAll(getKuduScanTokensForSelectAllColumns());
+    } catch (Exception e) {
+      LOG.error(" Error while calculating the number of scan tokens for all column projections " + e.getMessage(),e);
+    }
+    if ( allColumnsScanTokens.size() == 0 ) {
+      LOG.error("No column information could be extracted from the Kudu table");
+      throw new Exception("No column information could be extracted from the Kudu table");
+    }
+    int totalPartitionCount = allColumnsScanTokens.size();
+    LOG.info("Determined maximum as " + totalPartitionCount + " tablets for this table");
+    for (int i = 0; i < totalPartitionCount; i++) {
+      KuduPartitionScanAssignmentMeta aMeta = new KuduPartitionScanAssignmentMeta();
+      aMeta.setOrdinal(i);
+      aMeta.setTotalSize(totalPartitionCount);
+      returnList.add(aMeta);
+      LOG.info("A planned scan meta of the total partitions " + aMeta);
+    }
+    LOG.info("Total kudu partition size is " + returnList.size());
+    return returnList;
+  }
+
+  /***
+   * Clones the original operator and sets the partition pie assignments for this operator. Kryo is used for cloning
+   * @param scanAssignmentsForThisPartition The partition pie that is assigned to the operator according to the
+   *                                        configured partitioner
+   * @return The partition that is used by the runtime to launch the new physical operator instance
+   */
+  public Partitioner.Partition<AbstractKuduInputOperator> clonePartitionAndAssignScanMeta(
+      List<KuduPartitionScanAssignmentMeta> scanAssignmentsForThisPartition)
+  {
+    Partitioner.Partition<AbstractKuduInputOperator> clonedKuduInputOperator =
+        new DefaultPartition<>(KryoCloneUtils.cloneObject(prototypeKuduInputOperator));
+    clonedKuduInputOperator.getPartitionedInstance().setPartitionPieAssignment(scanAssignmentsForThisPartition);
+    return clonedKuduInputOperator;
+  }
+
+  /***
+   * Implements the main logic for defining partitions. The logic to create a partition is pretty straightforward. A
+   * SELECT * expression is used to generate a plan ( a list of kudu scan tokens). The list is then evenly
+   * distributed to each of the physical operator instances as defined by the configuration. At runtime when a new query
+   * needs to be processed, a plan specific to the query is generated and the planned list of kudu scan tokens
+   * for that query are distributed among the available operator instances. Note that we define the partitions only once
+   * during the first invocation. This is because t is ideal we do not optimize on the current query that is running
+   * before we can dynamically partition and cause too much throttling based on the query patterns. Please see
+   * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+   * a query based planning is done.
+   * @param partitions The current set of partitions
+   * @param context The partitioning context as provided by the runtime platform.
+   * @return The collection of planned partitions as implemented by the partitioner that is configured by the user
+   */
+  @Override
+  public Collection<Partition<AbstractKuduInputOperator>> definePartitions(
+      Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context)
+  {
+    if ( partitions != null) {
+      LOG.info("The current partitioner plan has " + partitions.size() + " operators before redefining");
+    }
+    List<Partition<AbstractKuduInputOperator>> partitionsForInputOperator = new ArrayList<>();
+    List<KuduPartitionScanAssignmentMeta> assignmentMetaList = new ArrayList<>();
+    try {
+      assignmentMetaList.addAll(getListOfPartitionAssignments(partitions,context));
+    } catch (Exception e) {
+      throw new RuntimeException("Aborting partition planning as Kudu meta data could not be obtained", e);
+    }
+    LOG.info("Maximum possible Kudu input operator partition count is " + assignmentMetaList.size());
+    Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignments = assign(assignmentMetaList,context);
+    boolean requiresRepartitioning = false;
+    if ( partitions == null) {
+      requiresRepartitioning = true;
+    } else {
+      for ( Partition<AbstractKuduInputOperator> aPartition : partitions) {
+        if ( !aPartition.getPartitionedInstance().isPartitioned()) {
+          requiresRepartitioning = true;
+          break;
+        }
+      }
+    }
+    if ( requiresRepartitioning ) {
+      partitions.clear();
+      LOG.info("Clearing all of the current partitions and setting up new ones");
+      partitions.clear();
+      for ( int i = 0; i < assignments.size(); i++) {
+        List<KuduPartitionScanAssignmentMeta> assignmentForThisOperator = assignments.get(i);
+        partitionsForInputOperator.add(clonePartitionAndAssignScanMeta(assignmentForThisOperator));
+        LOG.info("Assigned apex operator " + i + " with " + assignmentForThisOperator.size() + " kudu mappings");
+      }
+      LOG.info("Returning " + partitionsForInputOperator.size() + " partitions for the input operator");
+      return partitionsForInputOperator;
+    } else {
+      LOG.info("Not making any changes to the partitions");
+      return partitions;
+    }
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractKuduInputOperator>> partitions)
+  {
+
+  }
+
+  /***
+   * Abstract method that will be implemented by the individual partition planners basing on the functionality they
+   * provide. Please see {@link KuduOneToManyPartitioner} and {@link KuduOneToOnePartitioner} for specific
+   * implementations
+   * @param totalList The ltotal list of possible tablet scans for all queries
+   * @param context The context that is provided by the framework when repartitioning is to be executed
+   * @return A map that gives the operator number as key and the list of {@link KuduPartitionScanAssignmentMeta} that
+   * are assigned for the operator with that number.
+   */
+  public abstract  Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(
+      List<KuduPartitionScanAssignmentMeta> totalList, PartitioningContext context);
+
+  public AbstractKuduInputOperator getPrototypeKuduInputOperator()
+  {
+    return prototypeKuduInputOperator;
+  }
+
+  public void setPrototypeKuduInputOperator(AbstractKuduInputOperator prototypeKuduInputOperator)
+  {
+    this.prototypeKuduInputOperator = prototypeKuduInputOperator;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java
new file mode 100644
index 0000000..1313778
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+
+/**
+ * Used when a user would like to assign multiple kudu tablets to a single physical instance of the Kudu input operator
+ */
+public class KuduOneToManyPartitioner extends AbstractKuduInputPartitioner
+{
+  private static final Logger LOG = LoggerFactory.getLogger(KuduOneToManyPartitioner.class);
+
+  public KuduOneToManyPartitioner(AbstractKuduInputOperator prototypeOperator)
+  {
+    super(prototypeOperator);
+  }
+
+  /**
+   * Distributes the tablets evenly among the physical instances of the kudu input operator. Please see
+   * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+   * a query based planning is done.
+   * @param totalList The total list of possible tablet scans for all queries
+   * @param context The context that is provided by the framework when repartitioning is to be executed
+   * @return  Map with key as operator id and value as the a list of assignments that can be assigned to that operator.
+   */
+  @Override
+  public Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(List<KuduPartitionScanAssignmentMeta> totalList,
+      PartitioningContext context)
+  {
+    Map<Integer,List<KuduPartitionScanAssignmentMeta>> partitionAssignments = new HashMap<>();
+    int partitionCount = getNumberOfPartitions(context);
+    if ( partitionCount <= 0) {
+      LOG.error(" Partition count cannot be zero ");
+      partitionCount = 1; // set to a minimum of one.
+    }
+    int idealDistributionRatio = (totalList.size() / partitionCount) + 1;
+    LOG.info(" Distributing not more than " + idealDistributionRatio + " partitions per input operator");
+    int counterForLoopingTotal = 0;
+    int totalSizeOfKuduScanAssignments = totalList.size();
+    for ( int i = 0; i < partitionCount; i++) {
+      partitionAssignments.put(i, new ArrayList<KuduPartitionScanAssignmentMeta>());
+    }
+    // We round robin all of the scan assignments so that all of the apex partitioned operators a part of the effort
+    while ( counterForLoopingTotal < totalSizeOfKuduScanAssignments) {
+      for (int i = 0; i < partitionCount; i++) {
+        List<KuduPartitionScanAssignmentMeta> assignmentsForThisOperatorId = partitionAssignments.get(i);
+        if (counterForLoopingTotal < totalSizeOfKuduScanAssignments) { // take care of non-optimal distribution ratios
+          assignmentsForThisOperatorId.add(totalList.get(counterForLoopingTotal));
+          counterForLoopingTotal += 1;
+        } else {
+          break;
+        }
+      }
+    }
+    return partitionAssignments;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java
new file mode 100644
index 0000000..e477e83
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+/**
+ * A partitioner that assigns one to one mapping of each kudu tablet to one physical instance of Kudu input operator.
+ */
+public class KuduOneToOnePartitioner extends AbstractKuduInputPartitioner
+{
+  private static final Logger LOG = LoggerFactory.getLogger(KuduOneToOnePartitioner.class);
+
+  public KuduOneToOnePartitioner(AbstractKuduInputOperator prototypeOperator)
+  {
+    super(prototypeOperator);
+  }
+
+  /***
+   * Takes the total list of possible partition scans from a SELECT * expression and then distributes one
+   * tablet per operator id. Note that the operator id is just an integer representation in this method. See
+   * {@link AbstractKuduInputPartitioner.PartitioningContext#definePartitions(Collection, PartitioningContext)} where
+   * this method is used to assign the plan to the actual operator instances. Please see
+   * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+   * a query based planning is done.
+   * @param totalList The ltotal list of possible tablet scans for all queries
+   * @param context The context that is provided by the framework when repartitioning is to be executed
+   * @return A Map of an operator identifier to the list of partition assignments.Note the Operator identifier is a
+   * simple ordinal numbering of the operator and not the actual operator id.
+   */
+  @Override
+  public Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(List<KuduPartitionScanAssignmentMeta> totalList,
+      PartitioningContext context)
+  {
+    Map<Integer,List<KuduPartitionScanAssignmentMeta>> partitionAssignments = new HashMap<>();
+    for ( int i = 0; i < totalList.size(); i++) {
+      List<KuduPartitionScanAssignmentMeta> assignmentForThisPartition = new ArrayList<>();
+      assignmentForThisPartition.add(totalList.get(i));
+      partitionAssignments.put(i,assignmentForThisPartition);
+    }
+    return partitionAssignments;
+  }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
similarity index 74%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
index 64b46c6..c7e1bc4 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
@@ -16,17 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.partitioner;
 
 /**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Used to set the partition scan.
  */
-public enum KuduMutationType
+public enum KuduPartitionScanStrategy
 {
-
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
+  ONE_TABLET_PER_OPERATOR,
+  MANY_TABLETS_PER_OPERATOR
 }
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
similarity index 58%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
index 64b46c6..cb54abc 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
@@ -16,17 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The partitioner package classes are responsible for creating the right number of Apex operators for a given kudu
+ *  table that is being scanned. There are two configuration modes that are provided. Map one Kudu partition to
+ *  one Apex operator or alternately map multiple Kudu tablets to one Apex operator. The bulk of the implementation
+ *   is in the {@link org.apache.apex.malhar.kudu.partitioner.AbstractKuduInputPartitioner}.
  */
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.partitioner;
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java
new file mode 100644
index 0000000..b998f60
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.KuduSQLParseTreeListener;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.client.AsyncKuduScanner;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An abstract class that contains logic common to all types of Scanner. A scanner is responsible for scanning rows from
+ * the kudu table based on the incoming SQL query. See {@link KuduPartitionConsistentOrderScanner}
+ *  and {@link KuduPartitionRandomOrderScanner} for options available as scanners.
+ */
+public abstract class AbstractKuduPartitionScanner<T,C extends InputOperatorControlTuple>
+{
+  @JsonIgnore
+  AbstractKuduInputOperator<T,C> parentOperator;
+
+  ExecutorService kuduConsumerExecutor;
+
+  ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionInfo;
+
+  int threadPoolExecutorSize = 1;
+
+  Map<Integer, ApexKuduConnection> connectionPoolForThreads = new HashMap<>();
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduPartitionScanner.class);
+
+  /***
+   * Scans all of the scans planned for the current input operator. Note that the current implementations are
+   * threaded implementations. Depending on the scanner, there are multiple threads or single thread
+   * @param parsedQuery Instance of the parser that parsed the incoming query.
+   * @param setters Setters that are resolved for the POJO class that represents the Kudu row
+   * @return The total number of records that have been scanned for the passed query
+   * @throws IOException
+   */
+  public abstract int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery,
+      Map<String,Object> setters) throws IOException;
+
+  public AbstractKuduPartitionScanner(AbstractKuduInputOperator<T,C> parentOperatorRunningThisScanner)
+  {
+    parentOperator = parentOperatorRunningThisScanner;
+    apexKuduConnectionInfo = parentOperator.getApexKuduConnectionInfo();
+  }
+
+  /***
+   * Implements common initialization logic across all types of scanners. Currently it is 1. Creating the thread pool
+   *  service for executors and 2. Creating the requisite connection pool for the scanners to use.
+   */
+  public void initScannerCommons()
+  {
+    kuduConsumerExecutor = Executors.newFixedThreadPool(threadPoolExecutorSize);
+    List<KuduPartitionScanAssignmentMeta> allPartitionsThatNeedScan = parentOperator.getPartitionPieAssignment();
+    Collections.sort(allPartitionsThatNeedScan,
+        new Comparator<KuduPartitionScanAssignmentMeta>()
+      {
+        @Override
+        public int compare(KuduPartitionScanAssignmentMeta left, KuduPartitionScanAssignmentMeta right)
+        {
+          return left.getOrdinal() - right.getOrdinal();
+        }
+      });
+    for ( int i = 0; i < threadPoolExecutorSize; i++) {
+      connectionPoolForThreads.put(i,apexKuduConnectionInfo.build());
+    }
+    LOG.info("Scanner running with " + connectionPoolForThreads.size() + " kudu connections");
+  }
+
+  /***
+   * Used to renew a connection in case it is dead. There can be scenarios when there is a continuous sequence of
+   * queries that do not touch a tablet resulting in inactivity on the kudu client session.
+   * @param indexPos The index position in the connection pool
+   * @return A renewed connection in case it was dead
+   */
+  public ApexKuduConnection verifyConnectionStaleness(int indexPos)
+  {
+    ApexKuduConnection apexKuduConnection = connectionPoolForThreads.get(indexPos);
+    checkNotNull(apexKuduConnection, "Null connection not expected while checking staleness of" +
+        " existing connection");
+    if (apexKuduConnection.getKuduSession().isClosed()) {
+      try {
+        apexKuduConnection.close(); // closes the wrapper
+      } catch (Exception e) {
+        LOG.error(" Could not close a possibly stale kudu connection handle ", e);
+      }
+      LOG.info("Ripped the old kudu connection out and building a new connection for this scanner");
+      ApexKuduConnection newConnection =  apexKuduConnection.getBuilderForThisConnection().build();
+      connectionPoolForThreads.put(indexPos,newConnection);
+      return newConnection;
+    } else {
+      return apexKuduConnection;
+    }
+  }
+
+  /***
+   * The main logic which takes the parsed in query and builds the Kudud scan tokens specific to this query.
+   * It makes sure that these scan tokens are sorted before the actual scan tokens that are to be executed in the
+   * current physical instance of the operator are shortlisted. Since the kudu scan taken builder gives the scan
+   * tokens for the query and does not differentiate between a distributed system and a single instance system, this
+   * method takes the plan as generated by the Kudu scan token builder and then chooses only those segments that were
+   * decided to be the responsibility of this operator at partitioning time.
+   * @param parsedQuery The parsed query instance
+   * @return A list of partition scan metadata objects that are applicable for this instance of the physical operator
+   * i.e. the operator owning this instance of the scanner.
+   * @throws IOException If the scan assignment cannot be serialized
+   */
+  public List<KuduPartitionScanAssignmentMeta> preparePlanForScanners(SQLToKuduPredicatesTranslator parsedQuery)
+    throws IOException
+  {
+    List<KuduPredicate> predicateList = parsedQuery.getKuduSQLParseTreeListener().getKuduPredicateList();
+    ApexKuduConnection apexKuduConnection = verifyConnectionStaleness(0);// we will have atleast one connection
+    KuduScanToken.KuduScanTokenBuilder builder = apexKuduConnection.getKuduClient().newScanTokenBuilder(
+        apexKuduConnection.getKuduTable());
+    builder = builder.setProjectedColumnNames(new ArrayList<>(
+        parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed()));
+    for (KuduPredicate aPredicate : predicateList) {
+      builder = builder.addPredicate(aPredicate);
+    }
+    builder.setFaultTolerant(parentOperator.isFaultTolerantScanner());
+    Map<String,String> optionsUsedForThisQuery = parentOperator.getOptionsEnabledForCurrentQuery();
+    if ( optionsUsedForThisQuery.containsKey(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME)) {
+      try {
+        long readSnapShotTime = Long.valueOf(optionsUsedForThisQuery.get(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME));
+        builder = builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
+        builder = builder.snapshotTimestampMicros(readSnapShotTime);
+        LOG.info("Using read snapshot for this query as " + readSnapShotTime);
+      } catch ( Exception ex) {
+        LOG.error("Cannot parse the Read snaptshot time " + ex.getMessage(), ex);
+      }
+    }
+    List<KuduScanToken> allPossibleScanTokens = builder.build();
+    Collections.sort(allPossibleScanTokens, // Make sure we deal with a sorted list of scan tokens
+        new Comparator<KuduScanToken>()
+      {
+        @Override
+        public int compare(KuduScanToken left, KuduScanToken right)
+        {
+          return left.compareTo(right);
+        }
+      });
+    LOG.info(" Query will scan " + allPossibleScanTokens.size() + " tablets");
+    if ( LOG.isDebugEnabled()) {
+      LOG.debug(" Predicates scheduled for this query are " + predicateList.size());
+      for ( int i = 0; i < allPossibleScanTokens.size(); i++) {
+        LOG.debug("A tablet scheduled for all operators scanning is " + allPossibleScanTokens.get(i).getTablet());
+      }
+    }
+    List<KuduPartitionScanAssignmentMeta> partitionPieForThisOperator = parentOperator.getPartitionPieAssignment();
+    List<KuduPartitionScanAssignmentMeta> returnOfAssignments = new ArrayList<>();
+    int totalScansForThisQuery = allPossibleScanTokens.size();
+    int counterForPartAssignments = 0;
+    for (KuduPartitionScanAssignmentMeta aPartofThePie : partitionPieForThisOperator) {
+      if ( aPartofThePie.getOrdinal() < totalScansForThisQuery) { // a given query plan might have less scantokens
+        KuduPartitionScanAssignmentMeta aMetaForThisQuery = new KuduPartitionScanAssignmentMeta();
+        aMetaForThisQuery.setTotalSize(totalScansForThisQuery);
+        aMetaForThisQuery.setOrdinal(counterForPartAssignments);
+        counterForPartAssignments += 1;
+        aMetaForThisQuery.setCurrentQuery(parsedQuery.getSqlExpresssion());
+        // we pick up only those ordinals that are part of the original partition pie assignment
+        KuduScanToken aTokenForThisOperator = allPossibleScanTokens.get(aPartofThePie.getOrdinal());
+        aMetaForThisQuery.setSerializedKuduScanToken(aTokenForThisOperator.serialize());
+        returnOfAssignments.add(aMetaForThisQuery);
+        LOG.debug("Added query scan for this operator " + aMetaForThisQuery + " with scan tablet as " +
+            allPossibleScanTokens.get(aPartofThePie.getOrdinal()).getTablet());
+      }
+    }
+    LOG.info(" A total of " + returnOfAssignments.size() + " have been scheduled for this operator");
+    return returnOfAssignments;
+  }
+
+  /***
+   * Closes the connection pool that is being maintained for each of the scanners.
+   */
+  public void close()
+  {
+    for ( int i = 0; i < connectionPoolForThreads.size(); i++) {
+      try {
+        connectionPoolForThreads.get(i).close();
+      } catch (Exception e) {
+        LOG.error("Error while closing kudu connection ",e);
+      }
+    }
+    kuduConsumerExecutor.shutdown();
+  }
+
+  public AbstractKuduInputOperator<T, C> getParentOperator()
+  {
+    return parentOperator;
+  }
+
+  public void setParentOperator(AbstractKuduInputOperator<T, C> parentOperator)
+  {
+    this.parentOperator = parentOperator;
+  }
+
+  public Map<Integer, ApexKuduConnection> getConnectionPoolForThreads()
+  {
+    return connectionPoolForThreads;
+  }
+
+  public void setConnectionPoolForThreads(Map<Integer, ApexKuduConnection> connectionPoolForThreads)
+  {
+    this.connectionPoolForThreads = connectionPoolForThreads;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java
new file mode 100644
index 0000000..a3ed595
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>A scanner implementation that scans kudu tablet rows in a consistent order. The ordered nature is guaranteed by
+ *  the following aspects.</p>
+ * <ol>
+ *   <li>All scan tokens for a given query are always ordered</li>
+ *   <li>A given operator always takes a select ordinal scan tokens for a given list of scan tokens as step 1 above</li>
+ *   <li>The consistent order scanner always scans one tablet after another ( and hence less performant compared to
+ *   that of the Random order scanner {@link KuduPartitionRandomOrderScanner}) </li>
+ * </ol>
+ */
+public class KuduPartitionConsistentOrderScanner<T,C extends InputOperatorControlTuple>
+    extends AbstractKuduPartitionScanner<T,C>
+{
+
+  public KuduPartitionConsistentOrderScanner(AbstractKuduInputOperator<T,C> parentOperator)
+  {
+    super(parentOperator);
+    threadPoolExecutorSize = 1;
+    initScannerCommons();
+  }
+
+  @Override
+  public int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery,
+      Map<String,Object> setters) throws IOException
+  {
+    List<KuduPartitionScanAssignmentMeta> plannedScansForthisQuery = preparePlanForScanners(parsedQuery);
+    kuduConsumerExecutor.submit(new SequentialScannerThread(parsedQuery,setters,plannedScansForthisQuery));
+    return plannedScansForthisQuery.size();
+  }
+
+  /**
+   * A simple thread that ensures that all of the scan tokens are executed one after another. We need this thread
+   * to not block the {@link KuduPartitionConsistentOrderScanner#scanAllRecords(SQLToKuduPredicatesTranslator, Map)} to
+   * not get blocked as it is called during an emitTuple() call. The Future returns the number of records scanned.
+   */
+  public class SequentialScannerThread implements Callable<Long>
+  {
+    SQLToKuduPredicatesTranslator parsedQuery;
+
+    Map<String,Object> settersForThisQuery;
+
+    List<KuduPartitionScanAssignmentMeta> scansForThisQuery;
+
+    ExecutorService executorServiceForSequentialScanner = Executors.newFixedThreadPool(1);
+
+    public SequentialScannerThread(SQLToKuduPredicatesTranslator parsedQueryTree,Map<String,Object> setters,
+        List<KuduPartitionScanAssignmentMeta> plannedScans)
+    {
+      checkNotNull(parsedQueryTree,"Parsed SQL expression cannot be null for scanner");
+      checkNotNull(setters,"Setters cannot be null for the scanner thread");
+      checkNotNull(plannedScans,"Planned scan segments cannot be null for scanner thread");
+      parsedQuery = parsedQueryTree;
+      settersForThisQuery = setters;
+      scansForThisQuery = plannedScans;
+    }
+
+    @Override
+    public Long call() throws Exception
+    {
+      long overallCount = 0;
+      int counterForMeta = 0;
+      for ( KuduPartitionScanAssignmentMeta aMeta : scansForThisQuery) {
+        KuduPartitionScannerCallable<T,C> aScanJobThread = new KuduPartitionScannerCallable<T,C>(parentOperator,aMeta,
+            verifyConnectionStaleness(counterForMeta),settersForThisQuery,parsedQuery);
+        counterForMeta += 1;
+        Future<Long> scanResult = executorServiceForSequentialScanner.submit(aScanJobThread);
+        overallCount += scanResult.get(); // block till we complete this chunk;
+      }
+      executorServiceForSequentialScanner.shutdown(); // release resources
+      return overallCount;
+    }
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java
new file mode 100644
index 0000000..9054c46
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+/**
+ * <p>A scanner implementation that scans kudu tablet rows in an order which does not guarantee the same
+ * sequence of tuples in case of restart after a crash/kill. The nature of unordered scans is because
+ * of an eager approach to scan kudu tablets as all the thread pool based threads scan in parallel and hence
+ * ordering cannot be guaranteed to the same for each run on the same data set.
+ */
+public class KuduPartitionRandomOrderScanner<T,C extends InputOperatorControlTuple>
+    extends AbstractKuduPartitionScanner<T,C>
+{
+
+  public KuduPartitionRandomOrderScanner(AbstractKuduInputOperator<T,C> parentOperator)
+  {
+    super(parentOperator);
+    threadPoolExecutorSize = parentOperator.getPartitionPieAssignment().size();
+    initScannerCommons();
+  }
+
+  /***
+   * Scans all the records for the given query by launching a parallel scan of all the tablets that can
+   * serve the data for the given query.
+   * @param parsedQuery Instance of the parser that parsed the incoming query.
+   * @param setters Setters that are resolved for the POJO class that represents the Kudu row
+   * @return Total number of rows scanned
+   * @throws IOException When the Kudu connection cannot be closed after preparing the plan.
+   */
+  @Override
+  public int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery, Map<String, Object> setters) throws IOException
+  {
+    int counterForMeta = 0;
+    List<KuduPartitionScanAssignmentMeta> preparedScans = preparePlanForScanners(parsedQuery);
+    for ( KuduPartitionScanAssignmentMeta aMeta : preparedScans) {
+      KuduPartitionScannerCallable<T,C> aScanJobThread = new KuduPartitionScannerCallable<T,C>(parentOperator,aMeta,
+          verifyConnectionStaleness(counterForMeta),setters,parsedQuery);
+      counterForMeta += 1;
+      kuduConsumerExecutor.submit(aScanJobThread);
+    }
+    return preparedScans.size();
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java
new file mode 100644
index 0000000..1875aaa
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * The main metadata class that is used to hold the information about the scan token and also the ordinal
+ * position out of the total.
+ */
+public class KuduPartitionScanAssignmentMeta implements Serializable
+{
+  private static final long serialVersionUID = -3074453209476815690L;
+
+  private String currentQuery;
+
+  private byte[] serializedKuduScanToken;
+
+  private int ordinal;
+
+  private int totalSize;
+
+  public String getCurrentQuery()
+  {
+    return currentQuery;
+  }
+
+  public void setCurrentQuery(String currentQuery)
+  {
+    this.currentQuery = currentQuery;
+  }
+
+  public byte[] getSerializedKuduScanToken()
+  {
+    return serializedKuduScanToken;
+  }
+
+  public void setSerializedKuduScanToken(byte[] serializedKuduScanToken)
+  {
+    this.serializedKuduScanToken = serializedKuduScanToken;
+  }
+
+  public int getOrdinal()
+  {
+    return ordinal;
+  }
+
+  public void setOrdinal(int ordinal)
+  {
+    this.ordinal = ordinal;
+  }
+
+  public int getTotalSize()
+  {
+    return totalSize;
+  }
+
+  public void setTotalSize(int totalSize)
+  {
+    this.totalSize = totalSize;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof KuduPartitionScanAssignmentMeta)) {
+      return false;
+    }
+
+    KuduPartitionScanAssignmentMeta that = (KuduPartitionScanAssignmentMeta)o;
+
+    if (getOrdinal() != that.getOrdinal()) {
+      return false;
+    }
+    if (getTotalSize() != that.getTotalSize()) {
+      return false;
+    }
+    return getCurrentQuery().equals(that.getCurrentQuery());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = getCurrentQuery().hashCode();
+    result = 31 * result + getOrdinal();
+    result = 31 * result + getTotalSize();
+    return result;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KuduPartitionScanAssignmentMeta{" +
+      "currentQuery='" + currentQuery + '\'' +
+      ", serializedKuduScanToken=" + Arrays.toString(serializedKuduScanToken) +
+      ", ordinal=" + ordinal +
+      ", totalSize=" + totalSize +
+      '}';
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java
new file mode 100644
index 0000000..e664de2
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/***
+ * A callable implementation that is responsible for scanning all rows that are scannable from a given
+ * scan token range from a single Kudu tablet. The rows are sent to the input operator buffer so that they
+ * can be emitted as windows progress.
+ */
+public class KuduPartitionScannerCallable<T,C extends InputOperatorControlTuple> implements Callable<Long>
+{
+  private AbstractKuduInputOperator<T,C> operatorUsingThisScanner;
+
+  private KuduPartitionScanAssignmentMeta kuduPartitionScanAssignmentMeta;
+
+  private BlockingQueue<KuduRecordWithMeta<T>> bufferForTransmittingRecords;
+
+  private Class<T> clazzForResultObject;
+
+  private transient KuduClient kuduClientHandle;
+
+  private Map<String,ColumnSchema> tableSchema;
+
+  private Map<String,Object> settersForThisQueryScan;
+
+  private SQLToKuduPredicatesTranslator parsedQuery;
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(KuduPartitionScannerCallable.class);
+
+  public KuduPartitionScannerCallable(AbstractKuduInputOperator<T,C> kuduInputOperator,
+      KuduPartitionScanAssignmentMeta partitionMeta, ApexKuduConnection apexKuduConnection,Map<String,Object> setters,
+      SQLToKuduPredicatesTranslator parsedQueryInstance)
+  {
+    checkNotNull(kuduInputOperator,"Kudu operator instance cannot be null in the kudu scanner thread");
+    checkNotNull(partitionMeta, "Partition metadata cannot be null in kudu scanner thread");
+    checkNotNull(apexKuduConnection,"Kudu connection cannot be null in the kudu scanner thread");
+    checkNotNull(apexKuduConnection,"Setters cannot be null in the kudu scanner thread");
+    checkNotNull(parsedQueryInstance, "parsed Query instance cannot be null");
+    operatorUsingThisScanner = kuduInputOperator;
+    kuduPartitionScanAssignmentMeta = partitionMeta;
+    bufferForTransmittingRecords = kuduInputOperator.getBuffer();
+    clazzForResultObject = kuduInputOperator.getClazzForResultObject();
+    checkNotNull(apexKuduConnection,"Kudu connection cannot be null when initializing scanner");
+    kuduClientHandle = apexKuduConnection.getKuduClient();
+    checkNotNull(kuduClientHandle,"Kudu client cannot be null when initializing scanner");
+    tableSchema = kuduInputOperator.getKuduColNameToSchemaMapping();
+    settersForThisQueryScan = setters;
+    parsedQuery = parsedQueryInstance;
+  }
+
+  public void setValuesInPOJO(RowResult aRow, T payload)
+  {
+    Set<String> columnsUsed = parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed();
+    for (String aColumnName : columnsUsed) {
+      ColumnSchema schemaForThisColumn = tableSchema.get(aColumnName);
+      if (aRow.isNull(aColumnName)) {
+        continue;
+      }
+      switch ( schemaForThisColumn.getType().getDataType().getNumber()) {
+        case Common.DataType.BINARY_VALUE:
+          ((PojoUtils.Setter<T,ByteBuffer>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getBinary(aColumnName));
+          break;
+        case Common.DataType.STRING_VALUE:
+          ((PojoUtils.Setter<T,String>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getString(aColumnName));
+          break;
+        case Common.DataType.BOOL_VALUE:
+          ((PojoUtils.SetterBoolean<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getBoolean(aColumnName));
+          break;
+        case Common.DataType.DOUBLE_VALUE:
+          ((PojoUtils.SetterDouble<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getDouble(aColumnName));
+          break;
+        case Common.DataType.FLOAT_VALUE:
+          ((PojoUtils.SetterFloat<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getFloat(aColumnName));
+          break;
+        case Common.DataType.INT8_VALUE:
+          ((PojoUtils.SetterByte<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getByte(aColumnName));
+          break;
+        case Common.DataType.INT16_VALUE:
+          ((PojoUtils.SetterShort<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getShort(aColumnName));
+          break;
+        case Common.DataType.INT32_VALUE:
+          ((PojoUtils.SetterInt<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getInt(aColumnName));
+          break;
+        case Common.DataType.UNIXTIME_MICROS_VALUE:
+        case Common.DataType.INT64_VALUE:
+          ((PojoUtils.SetterLong<T>)settersForThisQueryScan.get(aColumnName)).set(
+              payload,aRow.getLong(aColumnName));
+          break;
+        case Common.DataType.UINT8_VALUE:
+          LOG.error("Unsigned int 8 not supported yet");
+          throw new RuntimeException("uint8 not supported in Kudu schema yet");
+        case Common.DataType.UINT16_VALUE:
+          LOG.error("Unsigned int 16 not supported yet");
+          throw new RuntimeException("uint16 not supported in Kudu schema yet");
+        case Common.DataType.UINT32_VALUE:
+          LOG.error("Unsigned int 32 not supported yet");
+          throw new RuntimeException("uint32 not supported in Kudu schema yet");
+        case Common.DataType.UINT64_VALUE:
+          LOG.error("Unsigned int 64 not supported yet");
+          throw new RuntimeException("uint64 not supported in Kudu schema yet");
+        case Common.DataType.UNKNOWN_DATA_VALUE:
+          LOG.error("unknown data type ( complex types ? )  not supported yet");
+          throw new RuntimeException("Unknown data type  ( complex types ? ) not supported in Kudu schema yet");
+        default:
+          LOG.error("unknown type/default  ( complex types ? )  not supported yet");
+          throw new RuntimeException("Unknown type/default  ( complex types ? ) not supported in Kudu schema yet");
+      }
+    }
+  }
+
+  @Override
+  public Long call() throws Exception
+  {
+    long numRowsScanned = 0;
+    KuduScanner aPartitionSpecificScanner = KuduScanToken.deserializeIntoScanner(
+        kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(), kuduClientHandle);
+    LOG.info("Scanning the following tablet " + KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta
+        .getSerializedKuduScanToken(), kuduClientHandle));
+    KuduRecordWithMeta<T> beginScanRecord = new KuduRecordWithMeta<>();
+    beginScanRecord.setBeginScanMarker(true);
+    beginScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+    bufferForTransmittingRecords.add(beginScanRecord); // Add a record entry that denotes the end of this scan.
+    while ( aPartitionSpecificScanner.hasMoreRows()) {
+      LOG.debug("Number of columns being returned for this read " +
+          aPartitionSpecificScanner.getProjectionSchema().getColumnCount());
+      RowResultIterator resultIterator = aPartitionSpecificScanner.nextRows();
+      if (resultIterator == null) {
+        break;
+      } else {
+        while (resultIterator.hasNext()) {
+          KuduRecordWithMeta<T> recordWithMeta = new KuduRecordWithMeta<>();
+          RowResult aRow = resultIterator.next();
+          recordWithMeta.setPositionInScan(numRowsScanned);
+          T payload = clazzForResultObject.newInstance();
+          recordWithMeta.setThePayload(payload);
+          recordWithMeta.setEndOfScanMarker(false);
+          recordWithMeta.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+          setValuesInPOJO(aRow,payload);
+          bufferForTransmittingRecords.add(recordWithMeta);
+          numRowsScanned += 1;
+        }
+      }
+    }
+    aPartitionSpecificScanner.close();
+    KuduRecordWithMeta<T> endScanRecord = new KuduRecordWithMeta<>();
+    endScanRecord.setEndOfScanMarker(true);
+    endScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+    bufferForTransmittingRecords.add(endScanRecord); // Add a record entry that denotes the end of this scan.
+    LOG.info(" Scanned a total of " + numRowsScanned + " for this scanner thread @tablet " +
+        KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(),
+        kuduClientHandle));
+    return numRowsScanned;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java
new file mode 100644
index 0000000..77b7314
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+/**
+ * Represents a Kudu row and metadata for the row that was consumed by the Kudu scanner client.
+ * The metadata includes information like the ordinal position for this scan instance, the query and the scan
+ * token was scheduled.
+ */
+public class KuduRecordWithMeta<T>
+{
+  private T thePayload;
+
+  private long positionInScan;
+
+  private boolean isEndOfScanMarker;
+
+  private boolean isBeginScanMarker;
+
+  private KuduPartitionScanAssignmentMeta tabletMetadata;
+
+
+  public T getThePayload()
+  {
+    return thePayload;
+  }
+
+  public void setThePayload(T thePayload)
+  {
+    this.thePayload = thePayload;
+  }
+
+  public long getPositionInScan()
+  {
+    return positionInScan;
+  }
+
+  public void setPositionInScan(long positionInScan)
+  {
+    this.positionInScan = positionInScan;
+  }
+
+  public boolean isEndOfScanMarker()
+  {
+    return isEndOfScanMarker;
+  }
+
+  public void setEndOfScanMarker(boolean endOfScanMarker)
+  {
+    isEndOfScanMarker = endOfScanMarker;
+  }
+
+  public KuduPartitionScanAssignmentMeta getTabletMetadata()
+  {
+    return tabletMetadata;
+  }
+
+  public void setTabletMetadata(KuduPartitionScanAssignmentMeta tabletMetadata)
+  {
+    this.tabletMetadata = tabletMetadata;
+  }
+
+  public boolean isBeginScanMarker()
+  {
+    return isBeginScanMarker;
+  }
+
+  public void setBeginScanMarker(boolean beginScanMarker)
+  {
+    isBeginScanMarker = beginScanMarker;
+  }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
similarity index 74%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
index 64b46c6..130a8ef 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
@@ -16,17 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.scanner;
 
 /**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Used to define the scan order strategy when multiple scan tokens are assigned to a single Apex partition
  */
-public enum KuduMutationType
+public enum KuduScanOrderStrategy
 {
-
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
+  CONSISTENT_ORDER_SCANNER,
+  RANDOM_ORDER_SCANNER
 }
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
similarity index 51%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
index 64b46c6..6904a7d 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
@@ -16,17 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The scanner package holds classes responsible scanning kudu tablets as per the provided predicates. There are two
+ *  types of scanner model provided. {@link org.apache.apex.malhar.kudu.scanner.KuduPartitionConsistentOrderScanner}
+ *   provides for a consistent ordering of tuples. This is to be used when exactly once processing semantics are
+ *   required in the downstream operators. The second model is
+ *   {@link org.apache.apex.malhar.kudu.scanner.KuduPartitionRandomOrderScanner} provides for a throughput oriented
+ *   implementation and cannot be used when exactly once semantics is required in the downstream operators.
  */
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.scanner;
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java
new file mode 100644
index 0000000..5ff08c5
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.antlr.v4.runtime.BaseErrorListener;
+import org.antlr.v4.runtime.RecognitionException;
+import org.antlr.v4.runtime.Recognizer;
+
+/**
+ * A simple error listener that is plugged into the Kudu expression parser
+ */
+public class KuduSQLExpressionErrorListener extends BaseErrorListener
+{
+  private boolean syntaxError = false;
+
+  private List<String> listOfErrorMessages = new ArrayList<>();
+
+  @Override
+  public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol, int line, int charPositionInLine,
+      String msg, RecognitionException e)
+  {
+    super.syntaxError(recognizer, offendingSymbol, line, charPositionInLine, msg, e);
+    syntaxError = true;
+    listOfErrorMessages.add(msg);
+  }
+
+  public boolean isSyntaxError()
+  {
+    return syntaxError;
+  }
+
+  public void setSyntaxError(boolean syntaxError)
+  {
+    this.syntaxError = syntaxError;
+  }
+
+  public List<String> getListOfErrorMessages()
+  {
+    return listOfErrorMessages;
+  }
+
+  public void setListOfErrorMessages(List<String> listOfErrorMessages)
+  {
+    this.listOfErrorMessages = listOfErrorMessages;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java
new file mode 100644
index 0000000..a1e0ec5
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionBaseListener;
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionParser;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduPredicate;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class builds Kudu equivalent scanner logic from a supplied SQL expression.
+ * Note that the SQL expression is set in {@code KuduSQLParser}
+ * The parser class uses a "tree walker" to walk the parsed tree and while doing so registers this class
+ * This listener then start building the Kudu scanner constructs as it listens to the tree walk steps.
+ * Note that it only only overrides required methods from its base class {@code KuduSQLExpressionBaseListener}
+ * The KuduSQLExpressionBaseListener class is an autogenerated class by the Antlr4 compiler
+ * */
+public class KuduSQLParseTreeListener extends KuduSQLExpressionBaseListener
+{
+  private boolean isSuccessfullyParsed = true;
+
+  private boolean isFilterExpressionEnabled = false;
+
+  private boolean isOptionsEnabled = false;
+
+  private Set<String> listOfColumnsUsed = new HashSet<>();
+
+  private Map<String,String> aliases = new HashMap<>();
+
+  private Map<String,String> optionsUsed = new HashMap<>();
+
+  private List<KuduPredicate> kuduPredicateList = new ArrayList<>();
+
+  private Map<String,ColumnSchema> columnSchemaLookups = new HashMap<>();
+
+  private String tableName = null;
+
+  private boolean isSelectStarExpressionEnabled = false;
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(KuduSQLParseTreeListener.class);
+
+  private String controlTupleMessage = null;
+
+  private Long readSnapshotTime = null;
+
+  public static final String READ_SNAPSHOT_TIME = "read_snapshot_time";
+
+  public static final String CONTROLTUPLE_MESSAGE = "controltuple_message";
+
+  private void clearAll()
+  {
+    listOfColumnsUsed.clear();
+    aliases.clear();
+    optionsUsed.clear();
+    kuduPredicateList.clear();
+    tableName = null;
+    columnSchemaLookups.clear();
+  }
+
+  public void setColumnSchemaList(List<ColumnSchema> listOfColumnsForCurrentTable)
+  {
+    Preconditions.checkNotNull(listOfColumnsForCurrentTable,"Column schemas " +
+        "cannot be null for kudu table");
+    for (ColumnSchema aColumnDef : listOfColumnsForCurrentTable) {
+      columnSchemaLookups.put(aColumnDef.getName(),aColumnDef);
+      aliases.put(aColumnDef.getName(),aColumnDef.getName()); // By default each column is its own alias in POJO.
+    }
+  }
+
+  @Override
+  public void exitKudusqlexpression(KuduSQLExpressionParser.KudusqlexpressionContext ctx)
+  {
+    LOG.debug(" Scanning " + listOfColumnsUsed.size() + " columns from " + tableName + " table");
+    LOG.debug(" Number of predicates that are part of the scan are " + kuduPredicateList.size());
+    LOG.debug(" Select * expression enabled " + isSelectStarExpressionEnabled);
+    LOG.debug(" Filter expression enabled " + isFilterExpressionEnabled);
+    LOG.debug(" Are options being enabled  " + isOptionsEnabled);
+    LOG.debug(" Read snapshot time being used as " + readSnapshotTime);
+    LOG.debug(" Control tuple message being used as " + controlTupleMessage);
+  }
+
+  @Override
+  public void exitSELECT_ID_ONLY_USED_AS_COLUMN_NAME(KuduSQLExpressionParser.SELECT_ID_ONLY_USED_AS_COLUMN_NAMEContext
+      ctx)
+  {
+    super.exitSELECT_ID_ONLY_USED_AS_COLUMN_NAME(ctx);
+    KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName = ctx.idorcolumnname();
+    if (ctxForIDOrColumnName != null ) { // we can ignore complex expressions as they are resolved recursively
+      // We are not inside as complex comma separated expression
+      String columnName = extractColumnNameFromContext(ctxForIDOrColumnName);
+      if ( columnName != null) {
+        if (columnSchemaLookups.containsKey(columnName)) {
+          LOG.debug(" Kudu column being enabled for scanning " + columnName );
+          aliases.put(columnName,columnName);
+          listOfColumnsUsed.add(columnName);
+        } else {
+          LOG.error("Invalid column  being set as a scanner ( column names are case sensitive ). " + columnName);
+          isSuccessfullyParsed = false;
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public void exitALL_COLUMNS_SELECT_EXP(KuduSQLExpressionParser.ALL_COLUMNS_SELECT_EXPContext ctx)
+  {
+    super.exitALL_COLUMNS_SELECT_EXP(ctx);
+    listOfColumnsUsed.addAll(columnSchemaLookups.keySet());
+    isSelectStarExpressionEnabled = true;
+  }
+
+  private String extractColumnNameFromContext(KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName)
+  {
+    if (ctxForIDOrColumnName.ID() != null) {
+      // we are dealing with a non-reserved keyword as a columnname.
+      return ctxForIDOrColumnName.ID().getSymbol().getText();
+    } else {
+      for (int i = 0; i < ctxForIDOrColumnName.getChildCount(); i++) {
+        // iterate all the terminal node children to see wihich keyword is used. Note that there are spaces possible
+        ParseTree terminalNodeTree = ctxForIDOrColumnName.getChild(i);
+        String childNodeText = terminalNodeTree.getText();
+        if ( (!childNodeText.equalsIgnoreCase(" ")) &&
+            (!childNodeText.equalsIgnoreCase("'"))
+            ) {
+          return childNodeText; // Anything other thant a quote or whitespace is the column name ( reserved )
+        }
+      } // end for loop for terminal node children
+    } // conditional for non-ID column names
+    return null;
+  }
+
+  @Override
+  public void exitSELECT_ALIAS_USED(KuduSQLExpressionParser.SELECT_ALIAS_USEDContext ctx)
+  {
+    super.exitSELECT_ALIAS_USED(ctx);
+    KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName = ctx.idorcolumnname();
+    if (ctxForIDOrColumnName != null ) { // we can ignore complex expressions as they are resolved recursively
+      // We are not inside as complex comma separated expression
+      String columnName = extractColumnNameFromContext(ctxForIDOrColumnName);
+      if ( columnName != null) {
+        if (columnSchemaLookups.containsKey(columnName)) {
+          LOG.debug(" Kudu column being enabled for scanning " + columnName );
+          LOG.debug(columnName + " is being scanned as " + ctx.ID().getSymbol().getText());
+          aliases.put(columnName,ctx.ID().getSymbol().getText());
+          listOfColumnsUsed.add(columnName);
+        } else {
+          LOG.error("Invalid column  being set as a scanner ( column names are case sensitive ). " + columnName);
+          isSuccessfullyParsed = false;
+        }
+      }
+    } // end null check only for id or column names
+  }
+
+  @Override
+  public void exitTableclause(KuduSQLExpressionParser.TableclauseContext ctx)
+  {
+    super.exitTableclause(ctx);
+    tableName = ctx.ID().getText();
+  }
+
+  @Override
+  public void exitWhereclause(KuduSQLExpressionParser.WhereclauseContext ctx)
+  {
+    super.exitWhereclause(ctx);
+    isFilterExpressionEnabled = true;
+  }
+
+  @Override
+  public void exitWithoptionsclause(KuduSQLExpressionParser.WithoptionsclauseContext ctx)
+  {
+    super.exitWithoptionsclause(ctx);
+    isOptionsEnabled = true;
+  }
+
+  private KuduPredicate buildKuduPredicate(String columnName,KuduPredicate.ComparisonOp comparisonOp,
+      KuduSQLExpressionParser.AnyvalueContext anyvalueContext)
+  {
+    ColumnSchema thisColumnSchema = columnSchemaLookups.get(columnName);
+    if (anyvalueContext.bool() != null) {
+      if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.BOOL_VALUE)) {
+        LOG.error(" Mismatched data type for column " + columnName);
+        isSuccessfullyParsed  = false;
+        return null;
+      }
+      return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+        Boolean.valueOf(anyvalueContext.bool().getText().toLowerCase()));
+    }
+    if (anyvalueContext.doubleval() != null) {
+      if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.DOUBLE_VALUE)) {
+        LOG.error(" Mismatched data type for column (Ensure doubles are appended with letter d)" + columnName);
+        isSuccessfullyParsed  = false;
+        return null;
+      }
+      return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+        Double.valueOf(anyvalueContext.doubleval().getText()));
+    }
+    if (anyvalueContext.floatval() != null) {
+      if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.FLOAT_VALUE)) {
+        LOG.error(" Mismatched data type for column (Ensure doubles are appended with letter f)" + columnName);
+        isSuccessfullyParsed  = false;
+        return null;
+      }
+      return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+        Float.valueOf(anyvalueContext.floatval().getText()));
+    }
+    if (anyvalueContext.stringval() != null) {
+      if ( (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.STRING_VALUE)) &&
+          (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.BINARY_VALUE))) {
+        LOG.error(" Mismatched data type for column ( Has to be a string or a binary value enclosed in" +
+            " double quotes" + columnName);
+        isSuccessfullyParsed  = false;
+        return null;
+      }
+      if (thisColumnSchema.getType().getDataType().getNumber() == (Common.DataType.STRING_VALUE)) {
+        return KuduPredicate.newComparisonPredicate(thisColumnSchema, comparisonOp,
+          anyvalueContext.stringval().getText());
+      }
+      if (thisColumnSchema.getType().getDataType().getNumber() == (Common.DataType.BINARY_VALUE)) {
+        return KuduPredicate.newComparisonPredicate(thisColumnSchema, comparisonOp,
+          anyvalueContext.stringval().getText().getBytes());
+      }
+    }
+    if (anyvalueContext.numval() != null) {
+      int dataTypeNumberForKuduCol = thisColumnSchema.getType().getDataType().getNumber();
+      if (
+          (dataTypeNumberForKuduCol != (Common.DataType.UNIXTIME_MICROS_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.INT8_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.INT16_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.INT32_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.INT64_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.UINT8_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.UINT16_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.UINT32_VALUE)) &&
+          (dataTypeNumberForKuduCol != (Common.DataType.UINT64_VALUE))
+          ) {
+        LOG.error(" Mismatched data type for column " + columnName);
+        isSuccessfullyParsed  = false;
+        return null;
+      }
+      return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+        Long.valueOf(anyvalueContext.numval().getText()));
+    }
+    return null;
+  }
+
+  @Override
+  public void exitFILTER_COMPARISION_EXP(KuduSQLExpressionParser.FILTER_COMPARISION_EXPContext ctx)
+  {
+    super.exitFILTER_COMPARISION_EXP(ctx);
+    String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+    if (columnSchemaLookups.containsKey(columnName)) {
+      if (ctx.comparisionoperator().EQUAL_TO() != null) {
+        kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.EQUAL,ctx.anyvalue()));
+      }
+      if (ctx.comparisionoperator().GREATER_THAN() != null) {
+        kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.GREATER,ctx.anyvalue()));
+      }
+      if (ctx.comparisionoperator().LESSER_THAN() != null) {
+        kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.LESS,ctx.anyvalue()));
+      }
+      if (ctx.comparisionoperator().GREATER_THAN_OR_EQUAL() != null) {
+        kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.GREATER_EQUAL,ctx.anyvalue()));
+      }
+      if (ctx.comparisionoperator().LESSER_THAN_OR_EQUAL() != null) {
+        kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.LESS_EQUAL,ctx.anyvalue()));
+      }
+    } else {
+      LOG.error(columnName + " is not a valid column name for this kudu table");
+      isSuccessfullyParsed = false;
+    }
+  }
+
+  @Override
+  public void exitIS_NULL_FILTER_EXP(KuduSQLExpressionParser.IS_NULL_FILTER_EXPContext ctx)
+  {
+    super.exitIS_NULL_FILTER_EXP(ctx);
+    String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+    if (columnSchemaLookups.containsKey(columnName)) {
+      kuduPredicateList.add(KuduPredicate.newIsNullPredicate(columnSchemaLookups.get(columnName)));
+    } else {
+      LOG.error(columnName + " is not a valid column name for this kudu table");
+      isSuccessfullyParsed = false;
+    }
+
+  }
+
+  private List<Boolean> extractListOfBools(KuduSQLExpressionParser.ListofboolsContext listofboolsContext,
+      String columnName)
+  {
+    List<Boolean> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.BoolContext aBool: listofboolsContext.bool()) {
+      returnList.add(Boolean.valueOf(aBool.getText().toLowerCase()));
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of booleans specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+
+  private List<Double> extractListOfDoubles(KuduSQLExpressionParser.ListofdoublesContext listofdoubles,
+      String columnName)
+  {
+    List<Double> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.DoublevalContext aDouble: listofdoubles.doubleval()) {
+      returnList.add(Double.valueOf(aDouble.getText()));
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of doubles specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+  private List<Float> extractListOfFloats(KuduSQLExpressionParser.ListoffloatsContext listoffloatsContext,
+      String columnName)
+  {
+    List<Float> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.FloatvalContext aFloat: listoffloatsContext.floatval()) {
+      returnList.add(Float.valueOf(aFloat.getText()));
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of floats specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+
+  private List<Long> extractListOfLongs(KuduSQLExpressionParser.ListofnumsContext listofnumsContext,
+      String columnName)
+  {
+    List<Long> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.NumvalContext aLong: listofnumsContext.numval()) {
+      returnList.add(Long.valueOf(aLong.getText()));
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of longs specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+
+  private List<String> extractListOfStrings(KuduSQLExpressionParser.ListofstringsContext listofstringsContext,
+      String columnName)
+  {
+    List<String> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.StringvalContext aString: listofstringsContext.stringval()) {
+      returnList.add(aString.getText());
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of Strings specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+
+  private List<byte[]> extractListOfBinary(KuduSQLExpressionParser.ListofstringsContext listofstringsContext,
+      String columnName)
+  {
+
+    List<byte[]> returnList = new ArrayList<>();
+    for (KuduSQLExpressionParser.StringvalContext aString: listofstringsContext.stringval()) {
+      returnList.add(aString.getText().getBytes());
+    }
+    if (returnList.size() == 0) {
+      LOG.error("Empty list of binaries specified for IN expression for column filter " + columnName);
+      isSuccessfullyParsed = false;
+    }
+    return returnList;
+  }
+
+
+
+  @Override
+  public void exitIN_FILTER_EXP(KuduSQLExpressionParser.IN_FILTER_EXPContext ctx)
+  {
+    super.exitIN_FILTER_EXP(ctx);
+    String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+    if (columnSchemaLookups.containsKey(columnName)) {
+      KuduSQLParser.ListofanyvalueContext  listofanyvalueContext = ctx.listofanyvalue();
+      // sort out an IN predicate for a list of bools
+      KuduSQLExpressionParser.ListofboolsContext listofboolsContext = listofanyvalueContext.listofbools();
+      if (listofboolsContext != null) {
+        kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+            extractListOfBools(listofboolsContext,columnName)));
+      }
+
+      // sort out an IN predicate for a list of doubles
+      KuduSQLExpressionParser.ListofdoublesContext listofdoubles = listofanyvalueContext.listofdoubles();
+      if (listofdoubles != null) {
+        kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+            extractListOfDoubles(listofdoubles,columnName)));
+      }
+
+      // sort out an IN predicate for a list of floats
+      KuduSQLExpressionParser.ListoffloatsContext listoffloatsContext = listofanyvalueContext.listoffloats();
+      if (listoffloatsContext != null) {
+        kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+            extractListOfFloats(listoffloatsContext,columnName)));
+      }
+
+      // sort out an IN predicate for a list of longs
+      KuduSQLExpressionParser.ListofnumsContext listofnumsContext = listofanyvalueContext.listofnums();
+      if (listofnumsContext != null) {
+        kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+            extractListOfLongs(listofnumsContext,columnName)));
+      }
+
+      // sort out an IN predicate for a list of strings or binary
+      KuduSQLExpressionParser.ListofstringsContext listofstringsContext = listofanyvalueContext.listofstrings();
+      if (listofstringsContext != null) {
+        if (columnSchemaLookups.get(columnName).getType().getDataType().getNumber() == Common.DataType.STRING_VALUE) {
+          kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+              extractListOfStrings(listofstringsContext,columnName)));
+        }
+        if (columnSchemaLookups.get(columnName).getType().getDataType().getNumber() == Common.DataType.STRING_VALUE) {
+          kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+              extractListOfBinary(listofstringsContext,columnName)));
+        }
+      }
+
+    } else {
+      LOG.error(columnName + " is not a valid column name to be used in a where clause for this kudu table");
+      isSuccessfullyParsed = false;
+    }
+  }
+
+  @Override
+  public void exitIS_NOT_NULL_FILTER_EXP(KuduSQLExpressionParser.IS_NOT_NULL_FILTER_EXPContext ctx)
+  {
+    super.exitIS_NOT_NULL_FILTER_EXP(ctx);
+    String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+    if (columnSchemaLookups.containsKey(columnName)) {
+      kuduPredicateList.add(KuduPredicate.newIsNotNullPredicate(columnSchemaLookups.get(columnName)));
+    } else {
+      LOG.error(columnName + " is not a valid column name for this kudu table");
+      isSuccessfullyParsed = false;
+    }
+  }
+
+  @Override
+  public void exitSET_CONTROL_TUPLE_MSG(KuduSQLExpressionParser.SET_CONTROL_TUPLE_MSGContext ctx)
+  {
+    super.exitSET_CONTROL_TUPLE_MSG(ctx);
+    controlTupleMessage = ctx.STRINGVAL().getText();
+    optionsUsed.put(CONTROLTUPLE_MESSAGE,controlTupleMessage);
+  }
+
+  @Override
+  public void exitSET_READ_SNAPSHOT_TIME(KuduSQLExpressionParser.SET_READ_SNAPSHOT_TIMEContext ctx)
+  {
+    super.exitSET_READ_SNAPSHOT_TIME(ctx);
+    readSnapshotTime = Long.valueOf(ctx.INT().getText());
+    optionsUsed.put(READ_SNAPSHOT_TIME, "" + readSnapshotTime);
+  }
+
+  public boolean isSuccessfullyParsed()
+  {
+    return isSuccessfullyParsed;
+  }
+
+  public void setSuccessfullyParsed(boolean successfullyParsed)
+  {
+    isSuccessfullyParsed = successfullyParsed;
+  }
+
+  public Set<String> getListOfColumnsUsed()
+  {
+    return listOfColumnsUsed;
+  }
+
+  public void setListOfColumnsUsed(Set<String> listOfColumnsUsed)
+  {
+    this.listOfColumnsUsed = listOfColumnsUsed;
+  }
+
+  public Map<String, String> getAliases()
+  {
+    return aliases;
+  }
+
+  public void setAliases(Map<String, String> aliases)
+  {
+    this.aliases = aliases;
+  }
+
+  public Map<String, String> getOptionsUsed()
+  {
+    return optionsUsed;
+  }
+
+  public void setOptionsUsed(Map<String, String> optionsUsed)
+  {
+    this.optionsUsed = optionsUsed;
+  }
+
+  public List<KuduPredicate> getKuduPredicateList()
+  {
+    return kuduPredicateList;
+  }
+
+  public void setKuduPredicateList(List<KuduPredicate> kuduPredicateList)
+  {
+    this.kuduPredicateList = kuduPredicateList;
+  }
+
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  public boolean isSelectStarExpressionEnabled()
+  {
+    return isSelectStarExpressionEnabled;
+  }
+
+  public void setSelectStarExpressionEnabled(boolean selectStarExpressionEnabled)
+  {
+    isSelectStarExpressionEnabled = selectStarExpressionEnabled;
+  }
+
+  public boolean isFilterExpressionEnabled()
+  {
+    return isFilterExpressionEnabled;
+  }
+
+  public void setFilterExpressionEnabled(boolean filterExpressionEnabled)
+  {
+    isFilterExpressionEnabled = filterExpressionEnabled;
+  }
+
+  public boolean isOptionsEnabled()
+  {
+    return isOptionsEnabled;
+  }
+
+  public void setOptionsEnabled(boolean optionsEnabled)
+  {
+    isOptionsEnabled = optionsEnabled;
+  }
+
+  public String getControlTupleMessage()
+  {
+    return controlTupleMessage;
+  }
+
+  public void setControlTupleMessage(String controlTupleMessage)
+  {
+    this.controlTupleMessage = controlTupleMessage;
+  }
+
+  public Long getReadSnapshotTime()
+  {
+    return readSnapshotTime;
+  }
+
+  public void setReadSnapshotTime(Long readSnapshotTime)
+  {
+    this.readSnapshotTime = readSnapshotTime;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java
new file mode 100644
index 0000000..acb221e
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import org.antlr.v4.runtime.TokenStream;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionParser;
+
+/**
+ * A simple wrapper on the top of the auto-generated KuduSQL expression parser.
+ */
+public class KuduSQLParser extends KuduSQLExpressionParser
+{
+  private KuduSQLExpressionErrorListener kuduSQLExpressionErrorListener;
+
+  public KuduSQLParser(TokenStream input)
+  {
+    super(input);
+    removeErrorListeners();
+    kuduSQLExpressionErrorListener = new KuduSQLExpressionErrorListener();
+    addErrorListener(kuduSQLExpressionErrorListener);
+  }
+
+  public KuduSQLExpressionErrorListener getKuduSQLExpressionErrorListener()
+  {
+    return kuduSQLExpressionErrorListener;
+  }
+
+  public void setKuduSQLExpressionErrorListener(KuduSQLExpressionErrorListener kuduSQLExpressionErrorListener)
+  {
+    this.kuduSQLExpressionErrorListener = kuduSQLExpressionErrorListener;
+  }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java
new file mode 100644
index 0000000..e34f381
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.List;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.antlr.v4.runtime.tree.ParseTreeWalker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionLexer;
+import org.apache.kudu.ColumnSchema;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class responsible for parsing an SQL expression and return parsed Kudu Client equivalent objects.
+ * This class is not Thread safe.
+ */
+public class SQLToKuduPredicatesTranslator
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(SQLToKuduPredicatesTranslator.class);
+
+  private String sqlExpresssion = null;
+
+  private KuduSQLExpressionErrorListener errorListener = null;
+
+  private KuduSQLParser parser = null;
+
+  private KuduSQLParseTreeListener kuduSQLParseTreeListener = null;
+
+  private List<ColumnSchema> allColumnsForThisTable = null;
+
+  public SQLToKuduPredicatesTranslator(String sqlExpresssionForParsing, List<ColumnSchema> tableColumns)
+    throws Exception
+  {
+    Preconditions.checkNotNull(tableColumns,"Kudu table cannot have null or empty columns");
+    Preconditions.checkNotNull(sqlExpresssionForParsing,"Kudu SQL expression cannot be null");
+    sqlExpresssion = sqlExpresssionForParsing;
+    allColumnsForThisTable = tableColumns;
+    parseKuduExpression();
+  }
+
+  public void parseKuduExpression() throws Exception
+  {
+    KuduSQLExpressionLexer lexer = new KuduSQLExpressionLexer(CharStreams.fromString(sqlExpresssion));
+    CommonTokenStream tokens = new CommonTokenStream( lexer );
+    parser = new KuduSQLParser( tokens );
+    errorListener = parser.getKuduSQLExpressionErrorListener();
+    ParseTree parserTree = parser.kudusqlexpression();
+    if (!errorListener.isSyntaxError()) {
+      ParseTreeWalker parseTreeWalker = new ParseTreeWalker();
+      kuduSQLParseTreeListener = new KuduSQLParseTreeListener();
+      kuduSQLParseTreeListener.setColumnSchemaList(allColumnsForThisTable);
+      try {
+        parseTreeWalker.walk(kuduSQLParseTreeListener, parserTree);
+      } catch (Exception ex) {
+        LOG.error(" The supplied SQL expression could not be parsed because " + ex.getMessage(),ex);
+        errorListener.setSyntaxError(true);
+      }
+    } else {
+      LOG.error(" Syntax error present in the Kudu SQL expression. Hence not processing");
+      List<String> allRegisteredSyntaxErrors = errorListener.getListOfErrorMessages();
+      for (String syntaxErrorMessage : allRegisteredSyntaxErrors) {
+        LOG.error(" Error : " + syntaxErrorMessage  + " in SQL expression \"" + sqlExpresssion + " \"");
+      }
+    }
+  }
+
+
+  public KuduSQLExpressionErrorListener getErrorListener()
+  {
+    return errorListener;
+  }
+
+  public void setErrorListener(KuduSQLExpressionErrorListener errorListener)
+  {
+    this.errorListener = errorListener;
+  }
+
+  public String getSqlExpresssion()
+  {
+    return sqlExpresssion;
+  }
+
+  public void setSqlExpresssion(String sqlExpresssion)
+  {
+    this.sqlExpresssion = sqlExpresssion;
+  }
+
+  public KuduSQLParser getParser()
+  {
+    return parser;
+  }
+
+  public void setParser(KuduSQLParser parser)
+  {
+    this.parser = parser;
+  }
+
+  public KuduSQLParseTreeListener getKuduSQLParseTreeListener()
+  {
+    return kuduSQLParseTreeListener;
+  }
+
+  public void setKuduSQLParseTreeListener(KuduSQLParseTreeListener kuduSQLParseTreeListener)
+  {
+    this.kuduSQLParseTreeListener = kuduSQLParseTreeListener;
+  }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
similarity index 56%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
index 64b46c6..85ce16c 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
@@ -16,17 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The sqltranslator package classes are responsible for constructing Kudu predicates given a SQL expression
+ *  as input. The main approach to build Kudu predicates is by implementing a tree listener that gets
+ *  callbacks from the Antlr parser engine basing on the grammar annotations. There is also an Error Listener to
+ *  help in marking a given SQL expression with the appropriate error markers so that they can be used in the code
+ *  to identify the exact error causes.
  */
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.sqltranslator;
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java
new file mode 100644
index 0000000..2ed3e09
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduInputOperatorTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testForErrorPortInCaseOfSQLError() throws Exception
+  {
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 5;
+    initOperatorState();
+    truncateTable();
+    addTestDataRows(10);
+    unitTestStepwiseScanInputOperator.getBuffer().clear();
+    unitTestStepwiseScanInputOperator.beginWindow(2L);
+    unitTestStepwiseScanInputOperator.processForQueryString("Select * from unittests where ");
+    unitTestStepwiseScanInputOperator.endWindow();
+    Thread.sleep(10000); // Sleep to allow for scans to complete
+    assertEquals(0,unitTestStepwiseScanInputOperator.getBuffer().size());
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 5;
+
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testForSendingBeginAndEndScanMarkers() throws Exception
+  {
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 1;
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    // truncate and add some data to the unit test table
+    truncateTable();
+    addTestDataRows(10); // This is per partition and there are 12 partitions
+    unitTestStepwiseScanInputOperator.getBuffer().clear();
+    unitTestStepwiseScanInputOperator.beginWindow(5);
+    unitTestStepwiseScanInputOperator.processForQueryString("SELECT * FROM unittests ");
+    Thread.sleep(10000); // Sleep to allow for scans to complete
+    int exptectedCount = (10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE) +
+        ( 2 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+    assertEquals(exptectedCount,unitTestStepwiseScanInputOperator.getBuffer().size());
+    // below only for debugging session usage and hence no asserts
+    for ( int i = 0; i < exptectedCount; i++) { // 12 partitions : 144 = 12 * 10 + 12 * 2 ( begin , end scan markers )
+      unitTestStepwiseScanInputOperator.emitTuples();
+    }
+    unitTestStepwiseScanInputOperator.endWindow();
+    // revert all of the changes
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 5;
+
+  }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java
new file mode 100644
index 0000000..7feb843
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class IncrementalStepScanInputOperatorTest extends KuduClientTestCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalStepScanInputOperatorTest.class);
+
+  public static final String APP_ID = "TestIncrementalScanInputOperator";
+  public static final int OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER = 1;
+
+  protected Context.OperatorContext operatorContext;
+  protected TestPortContext testPortContext;
+  protected IncrementalStepScanInputOperator<UnitTestTablePojo,InputOperatorControlTuple>
+      incrementalStepScanInputOperator;
+  protected Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions;
+
+  protected KuduPartitionScanStrategy partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+
+  protected KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+  protected  int numberOfKuduInputOperatorPartitions = 5;
+
+  protected Partitioner.PartitioningContext partitioningContext;
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testInit() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributeMapForInputOperator =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMapForInputOperator.put(DAG.APPLICATION_ID, APP_ID);
+    operatorContext = mockOperatorContext(OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER,
+      attributeMapForInputOperator);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributesForInputOperator =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributesForInputOperator.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+    testPortContext = new TestPortContext(portAttributesForInputOperator);
+
+    incrementalStepScanInputOperator = new IncrementalStepScanInputOperator(UnitTestTablePojo.class,
+        "kuduincrementalstepscaninputoperator.properties");
+    incrementalStepScanInputOperator.setNumberOfPartitions(numberOfKuduInputOperatorPartitions);
+    incrementalStepScanInputOperator.setPartitionScanStrategy(partitonScanStrategy);
+    incrementalStepScanInputOperator.setScanOrderStrategy(scanOrderStrategy);
+    partitioningContext = new Partitioner.PartitioningContext()
+    {
+      @Override
+      public int getParallelPartitionCount()
+      {
+        return numberOfKuduInputOperatorPartitions;
+      }
+
+      @Override
+      public List<Operator.InputPort<?>> getInputPorts()
+      {
+        return null;
+      }
+    };
+    partitions = incrementalStepScanInputOperator.definePartitions(
+      new ArrayList(), partitioningContext);
+    Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForMeta = partitions.iterator();
+    IncrementalStepScanInputOperator actualOperator =
+        (IncrementalStepScanInputOperator)iteratorForMeta.next().getPartitionedInstance();
+    // Adjust the bindings as if apex has completed the partioning.The runtime of the framework does this in reality
+    incrementalStepScanInputOperator = actualOperator;
+    incrementalStepScanInputOperator.setup(operatorContext);
+    incrementalStepScanInputOperator.activate(operatorContext);
+    //rewire parent operator to enable proper unit testing method calls
+    incrementalStepScanInputOperator.getPartitioner().setPrototypeKuduInputOperator(incrementalStepScanInputOperator);
+    incrementalStepScanInputOperator.getScanner().setParentOperator(incrementalStepScanInputOperator);
+  }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java
new file mode 100644
index 0000000..b7c63f4
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/** Common test code for all kudu operators
+ */
+public class KuduClientTestCommons
+{
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(KuduClientTestCommons.class);
+
+
+  protected static final String tableName = "unittests";
+
+  protected static KuduClient kuduClient;
+
+  protected static KuduTable kuduTable;
+
+  protected static Schema schemaForUnitTests;
+
+  protected static String kuduMasterAddresses = "192.168.1.41:7051";
+
+  public static final int SPLIT_COUNT_FOR_INT_ROW_KEY = 5;
+
+  public static final int HASH_BUCKETS_SIZE_FOR_ALL_HASH_COL = 2;
+
+  public static final int TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE = 12;
+
+  public static boolean tableInitialized = false;
+
+  public static Object objectForLocking = new Object();
+
+  protected static Map<String,ColumnSchema> columnDefs = new HashMap<>();
+
+
+  public static void setup() throws Exception
+  {
+    kuduClient = getClientHandle();
+    if (kuduClient.tableExists(tableName)) {
+      kuduClient.deleteTable(tableName);
+    }
+    createTestTable(tableName,kuduClient);
+    kuduTable = kuduClient.openTable(tableName);
+    tableInitialized = true;
+  }
+
+
+  public static void shutdown() throws Exception
+  {
+    kuduClient.close();
+  }
+
+
+  private static KuduClient getClientHandle() throws Exception
+  {
+    KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(kuduMasterAddresses);
+    KuduClient client = builder.build();
+    return client;
+  }
+
+  public static ApexKuduConnection.ApexKuduConnectionBuilder getConnectionConfigForTable()
+  {
+    ApexKuduConnection.ApexKuduConnectionBuilder connectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder();
+    return connectionBuilder.withAPossibleMasterHostAs(kuduMasterAddresses).withTableName(tableName);
+  }
+
+  public static Schema buildSchemaForUnitTestsTable() throws Exception
+  {
+    if (schemaForUnitTests != null) {
+      return schemaForUnitTests;
+    }
+    List<ColumnSchema> columns = new ArrayList<>();
+    ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
+        .key(true)
+        .build();
+    columns.add(intRowKeyCol);
+    columnDefs.put("introwkey",intRowKeyCol);
+    ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
+        .key(true)
+        .build();
+    columns.add(stringRowKeyCol);
+    columnDefs.put("stringrowkey",stringRowKeyCol);
+    ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
+        .key(true)
+        .build();
+    columns.add(timestampRowKey);
+    columnDefs.put("timestamprowkey",timestampRowKey);
+    ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
+        .nullable(true)
+        .build();
+    columns.add(longData);
+    columnDefs.put("longdata",longData);
+    ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
+        .nullable(true)
+        .build();
+    columns.add(stringData);
+    columnDefs.put("stringdata",stringData);
+    ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
+        .nullable(true)
+        .build();
+    columns.add(timestampdata);
+    columnDefs.put("timestampdata",timestampdata);
+    ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
+        .nullable(true)
+        .build();
+    columns.add(binarydata);
+    columnDefs.put("binarydata",binarydata);
+    ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
+        .nullable(true)
+        .build();
+    columns.add(floatdata);
+    columnDefs.put("floatdata",floatdata);
+    ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
+        .nullable(true)
+        .build();
+    columns.add(booldata);
+    columnDefs.put("booldata",booldata);
+    schemaForUnitTests = new Schema(columns);
+    return schemaForUnitTests;
+  }
+
+  public static void createTestTable(String tableName, KuduClient client) throws Exception
+  {
+    List<String> rangeKeys = new ArrayList<>();
+    rangeKeys.add("introwkey");
+    List<String> hashPartitions = new ArrayList<>();
+    hashPartitions.add("stringrowkey");
+    hashPartitions.add("timestamprowkey");
+    CreateTableOptions thisTableOptions = new CreateTableOptions()
+        .setNumReplicas(1)
+        .addHashPartitions(hashPartitions,HASH_BUCKETS_SIZE_FOR_ALL_HASH_COL)
+        .setRangePartitionColumns(rangeKeys);
+    int stepsize = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY;
+    int splitBoundary = stepsize;
+    Schema schema = buildSchemaForUnitTestsTable();
+    for ( int i = 0; i < SPLIT_COUNT_FOR_INT_ROW_KEY; i++) {
+      PartialRow splitRowBoundary = schema.newPartialRow();
+      splitRowBoundary.addInt("introwkey",splitBoundary);
+      thisTableOptions = thisTableOptions.addSplitRow(splitRowBoundary);
+      splitBoundary += stepsize;
+    }
+    try {
+      client.createTable(tableName, schema,thisTableOptions);
+    } catch (KuduException e) {
+      LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
+      throw e;
+    }
+
+  }
+
+  protected void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
+  {
+    KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
+        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
+        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
+        .build();
+    RowResultIterator rowResultItr = scanner.nextRows();
+    while (rowResultItr.hasNext()) {
+      RowResult thisRow = rowResultItr.next();
+      keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
+      keyInfo.setBooldata(thisRow.getBoolean("booldata"));
+      keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
+      keyInfo.setLongdata(thisRow.getLong("longdata"));
+      keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
+      keyInfo.setStringdata("stringdata");
+      break;
+    }
+  }
+
+  public ApexKuduConnection buildMockWiring(AbstractKuduInputOperator abstractKuduInputOperator,
+      int numScanTokens) throws Exception
+  {
+    ApexKuduConnection mockedConnectionHandle = PowerMockito.mock(ApexKuduConnection.class);
+    ApexKuduConnection.ApexKuduConnectionBuilder mockedConnectionHandleBuilder = PowerMockito.mock(
+        ApexKuduConnection.ApexKuduConnectionBuilder.class);
+    KuduClient mockedClient = PowerMockito.mock(KuduClient.class);
+    KuduSession mockedKuduSession = PowerMockito.mock(KuduSession.class);
+    KuduTable mockedKuduTable = PowerMockito.mock(KuduTable.class);
+    KuduScanToken.KuduScanTokenBuilder mockedScanTokenBuilder = PowerMockito.mock(
+        KuduScanToken.KuduScanTokenBuilder.class);
+    List<KuduScanToken> mockedScanTokens = new ArrayList<>();
+    int scanTokensToBuild = numScanTokens;
+    for (int i = 0; i < scanTokensToBuild; i++) {
+      mockedScanTokens.add(PowerMockito.mock(KuduScanToken.class));
+    }
+    PowerMockito.mockStatic(KryoCloneUtils.class);
+    when(KryoCloneUtils.cloneObject(abstractKuduInputOperator)).thenReturn(abstractKuduInputOperator);
+    //wire the mocks
+    when(abstractKuduInputOperator.getApexKuduConnectionInfo()).thenReturn(mockedConnectionHandleBuilder);
+    when(mockedConnectionHandle.getKuduClient()).thenReturn(mockedClient);
+    when(mockedClient.newSession()).thenReturn(mockedKuduSession);
+    when(mockedConnectionHandle.getKuduTable()).thenReturn(mockedKuduTable);
+    when(mockedConnectionHandle.getKuduSession()).thenReturn(mockedKuduSession);
+    when(mockedConnectionHandle.getBuilderForThisConnection()).thenReturn(mockedConnectionHandleBuilder);
+    when(mockedClient.openTable(tableName)).thenReturn(mockedKuduTable);
+    when(mockedConnectionHandleBuilder.build()).thenReturn(mockedConnectionHandle);
+    when(mockedKuduTable.getSchema()).thenReturn(schemaForUnitTests);
+    when(mockedClient.newScanTokenBuilder(mockedKuduTable)).thenReturn(mockedScanTokenBuilder);
+    when(mockedScanTokenBuilder.build()).thenReturn(mockedScanTokens);
+    return mockedConnectionHandle;
+  }
+
+  public static String getKuduMasterAddresses()
+  {
+    return kuduMasterAddresses;
+  }
+
+  public static void setKuduMasterAddresses(String kuduMasterAddresses)
+  {
+    KuduClientTestCommons.kuduMasterAddresses = kuduMasterAddresses;
+  }
+}
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
similarity index 65%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
index 615427a..b4ab141 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
@@ -16,82 +16,47 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.junit.AfterClass;
+
 import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduPredicate;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
+
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
 import com.datatorrent.lib.helper.TestPortContext;
+
 import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
 import static org.junit.Assert.assertEquals;
 
-
-public class KuduCreateUpdateDeleteOutputOperatorTest
+public class KuduCreateUpdateDeleteOutputOperatorTest extends KuduClientTestCommons
 {
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
 
 
   private static final transient Logger LOG = LoggerFactory.getLogger(KuduCreateUpdateDeleteOutputOperatorTest.class);
 
-  private static final String tableName = "unittests";
-
   private final String APP_ID = "TestKuduOutputOperator";
 
   private final int OPERATOR_ID_FOR_KUDU_CRUD = 0;
 
-  private static KuduClient kuduClient;
-
-  private static KuduTable kuduTable;
-
-  private static Map<String,ColumnSchema> columnDefs = new HashMap<>();
-
   private BaseKuduOutputOperator simpleKuduOutputOperator;
 
   private OperatorContext contextForKuduOutputOperator;
 
   private TestPortContext testPortContextForKuduOutput;
 
-  @BeforeClass
-  public static void setup() throws Exception
-  {
-    kuduClient = getClientHandle();
-    if (kuduClient.tableExists(tableName)) {
-      kuduClient.deleteTable(tableName);
-    }
-    createTestTable(tableName,kuduClient);
-    kuduTable = kuduClient.openTable(tableName);
-  }
-
-  @AfterClass
-  public static void shutdown() throws Exception
-  {
-    kuduClient.close();
-  }
-
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
   @Before
   public void setUpKuduOutputOperatorContext() throws Exception
   {
@@ -107,96 +72,8 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
     simpleKuduOutputOperator.input.setup(testPortContextForKuduOutput);
   }
 
-  private static KuduClient getClientHandle() throws Exception
-  {
-    KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("localhost:7051");
-    KuduClient client = builder.build();
-    return client;
-  }
-
-  private static void createTestTable(String tableName, KuduClient client) throws Exception
-  {
-    List<ColumnSchema> columns = new ArrayList<>();
-    ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
-        .key(true)
-        .build();
-    columns.add(intRowKeyCol);
-    columnDefs.put("introwkey",intRowKeyCol);
-    ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
-        .key(true)
-        .build();
-    columns.add(stringRowKeyCol);
-    columnDefs.put("stringrowkey",stringRowKeyCol);
-    ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
-        .key(true)
-        .build();
-    columns.add(timestampRowKey);
-    columnDefs.put("timestamprowkey",timestampRowKey);
-    ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
-        .build();
-    columns.add(longData);
-    columnDefs.put("longdata",longData);
-    ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
-        .build();
-    columns.add(stringData);
-    columnDefs.put("stringdata",stringData);
-    ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
-        .build();
-    columns.add(timestampdata);
-    columnDefs.put("timestampdata",timestampdata);
-    ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
-        .build();
-    columns.add(binarydata);
-    columnDefs.put("binarydata",binarydata);
-    ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
-        .build();
-    columns.add(floatdata);
-    columnDefs.put("floatdata",floatdata);
-    ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
-        .build();
-    columns.add(booldata);
-    columnDefs.put("booldata",booldata);
-    List<String> rangeKeys = new ArrayList<>();
-    rangeKeys.add("stringrowkey");
-    rangeKeys.add("timestamprowkey");
-    List<String> hashPartitions = new ArrayList<>();
-    hashPartitions.add("introwkey");
-    Schema schema = new Schema(columns);
-    try {
-      client.createTable(tableName, schema,
-          new CreateTableOptions()
-          .setNumReplicas(1)
-          .setRangePartitionColumns(rangeKeys)
-          .addHashPartitions(hashPartitions,2));
-    } catch (KuduException e) {
-      LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
-      throw e;
-    }
-  }
-
-  private void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
-  {
-    KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
-        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
-        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
-        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
-        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
-        .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
-        KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
-        .build();
-    RowResultIterator rowResultItr = scanner.nextRows();
-    while (rowResultItr.hasNext()) {
-      RowResult thisRow = rowResultItr.next();
-      keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
-      keyInfo.setBooldata(thisRow.getBoolean("booldata"));
-      keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
-      keyInfo.setLongdata(thisRow.getLong("longdata"));
-      keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
-      keyInfo.setStringdata("stringdata");
-      break;
-    }
-  }
 
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
   @Test
   public void processForUpdate() throws Exception
   {
@@ -235,6 +112,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
     assertEquals(unitTestTablePojoRead.isBooldata(), false);
   }
 
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
   @Test
   public void processForUpsert() throws Exception
   {
@@ -267,6 +145,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
     assertEquals(unitTestTablePojoRead.isBooldata(), true);
   }
 
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
   @Test
   public void processForDelete() throws Exception
   {
@@ -304,6 +183,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
     assertEquals(unitTestTablePojoRead.getBinarydata(), null);
   }
 
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
   @Test
   public void processForInsert() throws Exception
   {
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java
new file mode 100644
index 0000000..19a815c
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.client.Upsert;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class KuduInputOperatorCommons extends KuduClientTestCommons
+{
+  private static final Logger LOG = LoggerFactory.getLogger(KuduInputOperatorCommons.class);
+
+  public static final String APP_ID = "TestKuduInputOperatorOneToOnePartitioner";
+  public static final int OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER = 1;
+
+  protected Context.OperatorContext operatorContext;
+  protected TestPortContext testPortContext;
+  protected UnitTestStepwiseScanInputOperator unitTestStepwiseScanInputOperator;
+  protected Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions;
+
+  protected  KuduPartitionScanStrategy partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+
+  protected  KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+  protected static int numberOfKuduInputOperatorPartitions = 5;
+
+  protected static Partitioner.PartitioningContext partitioningContext;
+
+  public void initOperatorState() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributeMapForInputOperator =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMapForInputOperator.put(DAG.APPLICATION_ID, APP_ID);
+    operatorContext = mockOperatorContext(OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER,
+      attributeMapForInputOperator);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributesForInputOperator =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributesForInputOperator.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+    testPortContext = new TestPortContext(portAttributesForInputOperator);
+
+    unitTestStepwiseScanInputOperator = new UnitTestStepwiseScanInputOperator(
+        getConnectionConfigForTable(), UnitTestTablePojo.class);
+    unitTestStepwiseScanInputOperator.setNumberOfPartitions(numberOfKuduInputOperatorPartitions);
+    unitTestStepwiseScanInputOperator.setPartitionScanStrategy(partitonScanStrategy);
+    unitTestStepwiseScanInputOperator.setScanOrderStrategy(scanOrderStrategy);
+    initCommonConfigsForAllTypesOfTests();
+    partitions = unitTestStepwiseScanInputOperator.definePartitions(
+      new ArrayList(), partitioningContext);
+    Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForMeta = partitions.iterator();
+    UnitTestStepwiseScanInputOperator actualOperator =
+        (UnitTestStepwiseScanInputOperator)iteratorForMeta.next()
+        .getPartitionedInstance();
+    // Adjust the bindings as if apex has completed the partioning.The runtime of the framework does this in reality
+    unitTestStepwiseScanInputOperator = actualOperator;
+    unitTestStepwiseScanInputOperator.setup(operatorContext);
+    unitTestStepwiseScanInputOperator.activate(operatorContext);
+    //rewire parent operator to enable proper unit testing method calls
+    unitTestStepwiseScanInputOperator.getPartitioner().setPrototypeKuduInputOperator(unitTestStepwiseScanInputOperator);
+    unitTestStepwiseScanInputOperator.getScanner().setParentOperator(unitTestStepwiseScanInputOperator);
+  }
+
+  public static void initCommonConfigsForAllTypesOfTests() throws Exception
+  {
+    KuduClientTestCommons.buildSchemaForUnitTestsTable();
+    partitioningContext = new Partitioner.PartitioningContext()
+    {
+      @Override
+      public int getParallelPartitionCount()
+      {
+        return numberOfKuduInputOperatorPartitions;
+      }
+
+      @Override
+      public List<Operator.InputPort<?>> getInputPorts()
+      {
+        return null;
+      }
+    };
+  }
+
+  public void truncateTable() throws Exception
+  {
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForDeletingRows =
+        unitTestStepwiseScanInputOperator.getScanner();
+    List<KuduScanToken> scansForAllTablets = unitTestStepwiseScanInputOperator
+        .getPartitioner().getKuduScanTokensForSelectAllColumns();
+    ApexKuduConnection aCurrentConnection = scannerForDeletingRows.getConnectionPoolForThreads().get(0);
+    KuduSession aSessionForDeletes = aCurrentConnection.getKuduClient().newSession();
+    KuduTable currentTable = aCurrentConnection.getKuduTable();
+    for ( KuduScanToken aTabletScanToken : scansForAllTablets) {
+      KuduScanner aScanner = aTabletScanToken.intoScanner(aCurrentConnection.getKuduClient());
+      while ( aScanner.hasMoreRows()) {
+        RowResultIterator itrForRows = aScanner.nextRows();
+        while ( itrForRows.hasNext()) {
+          RowResult aRow = itrForRows.next();
+          int intRowKey = aRow.getInt("introwkey");
+          String stringRowKey = aRow.getString("stringrowkey");
+          long timestampRowKey = aRow.getLong("timestamprowkey");
+          Delete aDeleteOp = currentTable.newDelete();
+          aDeleteOp.getRow().addInt("introwkey",intRowKey);
+          aDeleteOp.getRow().addString("stringrowkey", stringRowKey);
+          aDeleteOp.getRow().addLong("timestamprowkey",timestampRowKey);
+          aSessionForDeletes.apply(aDeleteOp);
+        }
+      }
+    }
+    aSessionForDeletes.close();
+    Thread.sleep(2000); // Sleep to allow for scans to complete
+  }
+
+  public void addTestDataRows(int numRowsInEachPartition) throws Exception
+  {
+    int intRowKeyStepsize = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY;
+    int splitBoundaryForIntRowKey = intRowKeyStepsize;
+    int[] inputrowkeyPartitionEntries = new int[SPLIT_COUNT_FOR_INT_ROW_KEY + 1];
+    // setting the int keys that will fall in the range of all partitions
+    for ( int i = 0; i < SPLIT_COUNT_FOR_INT_ROW_KEY; i++) {
+      inputrowkeyPartitionEntries[i] = splitBoundaryForIntRowKey + 3; // 3 to fall into the partition next to boundary
+      splitBoundaryForIntRowKey += intRowKeyStepsize;
+    }
+    inputrowkeyPartitionEntries[SPLIT_COUNT_FOR_INT_ROW_KEY] = splitBoundaryForIntRowKey + 3;
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForAddingRows =
+        unitTestStepwiseScanInputOperator.getScanner();
+    ApexKuduConnection aCurrentConnection = scannerForAddingRows.getConnectionPoolForThreads().get(0);
+    KuduSession aSessionForInserts = aCurrentConnection.getKuduClient().newSession();
+    KuduTable currentTable = aCurrentConnection.getKuduTable();
+    long seedValueForTimestampRowKey = 0L; // constant to allow for data landing on first partition for unit tests
+    for ( int i = 0; i <= SPLIT_COUNT_FOR_INT_ROW_KEY; i++) { // range key iterator
+      int intRowKeyBaseValue = inputrowkeyPartitionEntries[i] + i;
+      for ( int k = 0; k < 2; k++) { // hash key iterator . The table defines two hash partitions
+        long timestampRowKeyValue = seedValueForTimestampRowKey + k; // to avoid spilling to another tablet
+        String stringRowKeyValue = "" + timestampRowKeyValue + k; // to avoid spilling to another tablet randomly
+        for ( int y = 0; y < numRowsInEachPartition; y++) {
+          Upsert aNewRow = currentTable.newUpsert();
+          PartialRow rowValue  = aNewRow.getRow();
+          // Start assigning row keys below the current split boundary.
+          rowValue.addInt("introwkey",intRowKeyBaseValue - y - 1);
+          rowValue.addString("stringrowkey",stringRowKeyValue);
+          rowValue.addLong("timestamprowkey",timestampRowKeyValue);
+          rowValue.addLong("longdata",(seedValueForTimestampRowKey + y));
+          rowValue.addString("stringdata", ("" + seedValueForTimestampRowKey + y));
+          OperationResponse response = aSessionForInserts.apply(aNewRow);
+        }
+      }
+    }
+    List<OperationResponse> insertResponse = aSessionForInserts.flush();
+    aSessionForInserts.close();
+    Thread.sleep(2000); // Sleep to allow for scans to complete
+  }
+
+  public long countNumRowsInTable() throws Exception
+  {
+    List<String> allProjectedCols = new ArrayList<>(
+        unitTestStepwiseScanInputOperator.getKuduColNameToSchemaMapping().keySet());
+    KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+        .setProjectedColumnNames(allProjectedCols)
+        .build();
+    long counter = 0;
+    while (scanner.hasMoreRows()) {
+      RowResultIterator rowResultItr = scanner.nextRows();
+      while (rowResultItr.hasNext()) {
+        RowResult thisRow = rowResultItr.next();
+        counter += 1;
+      }
+    }
+    return counter;
+  }
+}
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
similarity index 81%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
index 3933df7..81e93b7 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
@@ -16,19 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
+import org.junit.Rule;
+
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
 import org.apache.kudu.client.ExternalConsistencyMode;
 import org.apache.kudu.client.SessionConfiguration;
 
 public class SimpleKuduOutputOperator extends AbstractKuduOutputOperator
 {
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
   @Override
   ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
   {
     return new ApexKuduConnection.ApexKuduConnectionBuilder()
-        .withAPossibleMasterHostAs("localhost:7051")
-        .withTableName("unittests")
+        .withAPossibleMasterHostAs(KuduClientTestCommons.kuduMasterAddresses)
+        .withTableName(KuduClientTestCommons.tableName)
         .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
         .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
         .withNumberOfBossThreads(1)
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
similarity index 53%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
index 64b46c6..5f98411 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
@@ -16,17 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
- */
-public enum KuduMutationType
+
+public class UnitTestStepwiseScanInputOperator extends AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple>
 {
+  private int start = 0;
+
+  private int step = 2;
+
+  public UnitTestStepwiseScanInputOperator(ApexKuduConnection.ApexKuduConnectionBuilder kuduConnectionInfo,
+      Class<UnitTestTablePojo> clazzForPOJO) throws Exception
+  {
+    super(kuduConnectionInfo, clazzForPOJO);
+  }
+
+  public UnitTestStepwiseScanInputOperator()
+  {
+  }
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
+  @Override
+  protected String getNextQuery()
+  {
+    return "select timestamprowkey,longdata,stringdata from unittests where introwkey >= " + start +
+        "and introwkey <= " + (start++ + step);
+  }
 }
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
similarity index 83%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
index dc2cc33..da69ba1 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
 
 
 import java.nio.ByteBuffer;
@@ -122,4 +122,20 @@ public class UnitTestTablePojo
   {
     this.booldata = booldata;
   }
+
+  @Override
+  public String toString()
+  {
+    return "UnitTestTablePojo{" +
+      "introwkey=" + introwkey +
+      ", stringrowkey='" + stringrowkey + '\'' +
+      ", timestamprowkey=" + timestamprowkey +
+      ", longdata=" + longdata +
+      ", stringdata='" + stringdata + '\'' +
+      ", timestampdata=" + timestampdata +
+      ", binarydata=" + binarydata +
+      ", floatdata=" + floatdata +
+      ", booldata=" + booldata +
+      '}';
+  }
 }
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java
new file mode 100644
index 0000000..6beea0b
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.client.KuduScanToken;
+
+import com.datatorrent.api.Partitioner;
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduInputPartitionerTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testKuduSelectAllScanTokens() throws Exception
+  {
+    initOperatorState();
+    AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+    List<KuduScanToken> allScanTokens = partitioner.getKuduScanTokensForSelectAllColumns();
+    assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,allScanTokens.size());
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testNumberOfPartitions() throws Exception
+  {
+    numberOfKuduInputOperatorPartitions = -1; // situation when the user has not set the partitions explicitly
+    initOperatorState();
+    AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+    assertEquals(1,partitioner.getNumberOfPartitions(partitioningContext));
+    numberOfKuduInputOperatorPartitions = 7; // situation when the user has not set the partitions explicitly
+    initOperatorState();
+    partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+    assertEquals(7,partitioner.getNumberOfPartitions(partitioningContext));
+    numberOfKuduInputOperatorPartitions = -1; // revert the value back to default
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testDefinePartitions() throws Exception
+  {
+    // case of setting num partitions to 7 but setting one to one mapping.
+    numberOfKuduInputOperatorPartitions = 7;
+    initOperatorState();
+    AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+    assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,partitioner.definePartitions(
+        new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),partitioningContext).size());
+    // case of setting num partition to 7 but go with many to one mapping
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    initOperatorState();
+    partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+    Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitionsCalculated = partitioner.definePartitions(
+        new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),partitioningContext);
+    assertEquals(7,partitionsCalculated.size());
+    int maxPartitionPerOperator = -1;
+    int minPartitionPerOperator = Integer.MAX_VALUE;
+    Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForPartitionScan =
+        partitionsCalculated.iterator();
+    while ( iteratorForPartitionScan.hasNext()) {
+      Partitioner.Partition<AbstractKuduInputOperator> anOperatorPartition = iteratorForPartitionScan.next();
+      AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> anOperatorHandle =
+          anOperatorPartition.getPartitionedInstance();
+      List<KuduPartitionScanAssignmentMeta> partitionsAssigned = anOperatorHandle.getPartitionPieAssignment();
+      if (  partitionsAssigned.size() > maxPartitionPerOperator ) {
+        maxPartitionPerOperator = partitionsAssigned.size();
+      }
+      if ( partitionsAssigned.size() < minPartitionPerOperator) {
+        minPartitionPerOperator = partitionsAssigned.size();
+      }
+    }
+    assertEquals(2,maxPartitionPerOperator); // 7 kudu operator instances dealing with 12 kudu tablets
+    assertEquals(1,minPartitionPerOperator); // 7 kudu operator instances dealing with 12 kudu tablets
+    // revert all the changes to defaults for the next test
+    numberOfKuduInputOperatorPartitions = 5;
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+  }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java
new file mode 100644
index 0000000..97768d2
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.junit.MatcherAssert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KryoCloneUtils.class})
+public class KuduOneToManyPartitionerTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testAssignPartitions() throws Exception
+  {
+    AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> mockedInputOperator =
+        PowerMockito.mock(AbstractKuduInputOperator.class);
+    when(mockedInputOperator.getNumberOfPartitions()).thenReturn(5);
+    PowerMockito.mockStatic(KryoCloneUtils.class);
+    when(KryoCloneUtils.cloneObject(mockedInputOperator)).thenReturn(mockedInputOperator);
+    KuduOneToManyPartitioner kuduOneToManyPartitioner = new KuduOneToManyPartitioner(mockedInputOperator);
+    buildMockWiring(mockedInputOperator, KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+    kuduOneToManyPartitioner.setPrototypeKuduInputOperator(mockedInputOperator);
+    Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignedPartitions = kuduOneToManyPartitioner.assign(
+        kuduOneToManyPartitioner.getListOfPartitionAssignments(
+        new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),
+        partitioningContext),partitioningContext);
+    assertThat(assignedPartitions.size(), is(5));
+    for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+      assertThat(eachOperatorassignment.size(), lessThanOrEqualTo(3));
+    }
+    for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+      assertThat(eachOperatorassignment.size(), greaterThanOrEqualTo(2));
+    }
+  }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java
new file mode 100644
index 0000000..7d0d417
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.junit.MatcherAssert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KryoCloneUtils.class})
+public class KuduOneToOnePartitionerTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testAssignPartitions() throws Exception
+  {
+    AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> mockedInputOperator =
+        PowerMockito.mock(AbstractKuduInputOperator.class);
+    when(mockedInputOperator.getNumberOfPartitions()).thenReturn(9);
+    PowerMockito.mockStatic(KryoCloneUtils.class);
+    when(KryoCloneUtils.cloneObject(mockedInputOperator)).thenReturn(mockedInputOperator);
+    KuduOneToOnePartitioner kuduOneToOnePartitioner = new KuduOneToOnePartitioner(mockedInputOperator);
+    buildMockWiring(mockedInputOperator, KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+    kuduOneToOnePartitioner.setPrototypeKuduInputOperator(mockedInputOperator);
+    Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignedPartitions = kuduOneToOnePartitioner.assign(
+        kuduOneToOnePartitioner.getListOfPartitionAssignments(
+        new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),
+        partitioningContext),partitioningContext);
+    assertThat(assignedPartitions.size(), is(12));
+    for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+      assertThat(eachOperatorassignment.size(), is(1));
+    }
+  }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java
new file mode 100644
index 0000000..0e6fcd3
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduPartitionScannerTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void textPrepaprePlanForScanners() throws Exception
+  {
+    // no predicates test
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo, InputOperatorControlTuple> scanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select introwkey as intColumn from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    List<KuduPartitionScanAssignmentMeta> scansForThisQuery = scanner.preparePlanForScanners(translator);
+    // No predicates and hence a full scan and 1-1 implies one partition for this attempt
+    assertEquals(1,scansForThisQuery.size());
+    // many to one partitioner and no predicates
+    numberOfKuduInputOperatorPartitions = 4;
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    initOperatorState();
+    translator = new SQLToKuduPredicatesTranslator(
+      "select introwkey as intColumn from unittests",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    scanner = unitTestStepwiseScanInputOperator.getScanner();
+    scansForThisQuery = scanner.preparePlanForScanners(translator);
+    assertEquals(3,scansForThisQuery.size()); // 12 kudu partitions over 4 Apex partitions = 3
+    // many to one partitioner and no predicates
+    numberOfKuduInputOperatorPartitions = 4;
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    initOperatorState();
+    translator = new SQLToKuduPredicatesTranslator(
+      "select introwkey as intColumn from unittests where introwkey = 1",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    scanner = unitTestStepwiseScanInputOperator.getScanner();
+    scansForThisQuery = scanner.preparePlanForScanners(translator);
+    // This query will actually result in two tablet scans as there is a hash partition involved as well apart from
+    // range partitioning. However this operator is getting one scan as part of the pie assignment. Hence assert 1
+    assertEquals(1,scansForThisQuery.size());
+
+    //revert all changes back for subsequent tests
+    numberOfKuduInputOperatorPartitions = 5;
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testForverifyConnectionStaleness() throws Exception
+  {
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo, InputOperatorControlTuple> scanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    scanner.connectionPoolForThreads.get(0).close(); // forcefully close
+    assertEquals(true,scanner.connectionPoolForThreads.get(0).getKuduSession().isClosed());
+    scanner.verifyConnectionStaleness(0);
+    assertEquals(false,scanner.connectionPoolForThreads.get(0).getKuduSession().isClosed());
+  }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java
new file mode 100644
index 0000000..8d140c6
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduPartitionScannerCallableTest extends KuduInputOperatorCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testSettersForPojo() throws Exception
+  {
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select introwkey as intColumn from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+    KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+        KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,scansForThisQuery.get(0),
+        currentScanner.getConnectionPoolForThreads().get(0),
+        unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+    long countOfScan = threadToScan.call();
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testRowScansForAllDataAcrossAllPartitions() throws Exception
+  {
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 1;
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    // truncate and add some data to the unit test table
+    truncateTable();
+    addTestDataRows(10); // This is per partition and there are 12 partitions
+    assertEquals((KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE * 10 ),countNumRowsInTable());
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select * from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+    // Now scan for exact match of counts
+    long totalRowsRead = 0;
+    unitTestStepwiseScanInputOperator.getBuffer().clear();
+    for (KuduPartitionScanAssignmentMeta aSegmentToScan :  scansForThisQuery) {
+      KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+          KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,aSegmentToScan,
+          currentScanner.getConnectionPoolForThreads().get(0),
+          unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+      totalRowsRead += threadToScan.call();
+    }
+    // 144 = 120 records + 12 * 2 markers
+    int expectedCount = ( 10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE) +
+        ( 2 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+    assertEquals(expectedCount,unitTestStepwiseScanInputOperator.getBuffer().size());
+    // revert all configs to default
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 5;
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = true)
+  @Test
+  public void testRowScansForAllDataInSinglePartition() throws Exception
+  {
+    partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+    initOperatorState();
+    AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+        unitTestStepwiseScanInputOperator.getScanner();
+    // truncate and add some data to the unit test table
+    truncateTable();
+    addTestDataRows(10); // This is per partition and there are 12 partitions
+    assertEquals((10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE),countNumRowsInTable());
+    int intRowBoundary = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY; // 5 to allow ofr scan to fall in lower
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select * from unittests where introwkey < " + intRowBoundary,
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+    // Now scan for exact match of counts
+    long totalRowsRead = 0;
+    unitTestStepwiseScanInputOperator.getBuffer().clear();
+    for (KuduPartitionScanAssignmentMeta aSegmentToScan :  scansForThisQuery) {
+      KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+          KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,aSegmentToScan,
+          currentScanner.getConnectionPoolForThreads().get(0),
+          unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+      totalRowsRead += threadToScan.call();
+    }
+    // 23  because of the hash distributions and the scan markers. 21 are data records and 2 are end of scan markers
+    assertEquals(23L,unitTestStepwiseScanInputOperator.getBuffer().size());
+    // revert all configs to default
+    partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+    numberOfKuduInputOperatorPartitions = 5;
+  }
+
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java
new file mode 100644
index 0000000..59936fa
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class SQLToKuduPredicatesTranslatorTest extends KuduClientTestCommons
+{
+  @Rule
+  public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testForCompletenessOfSQLExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select from unittests",new ArrayList<ColumnSchema>(columnDefs.values()));
+    translator.parseKuduExpression();
+    KuduSQLExpressionErrorListener errorListener = translator.getErrorListener();
+    assertEquals(true,errorListener.isSyntaxError());
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testForErrorsInColumnAliasesInSQLExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        "select intkey as intColumn from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    KuduSQLExpressionErrorListener errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey as intColumn, 'from' as fgh from unittests",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey, 'from' from unittests",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey, 'from' as fgh from unittests",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testForErrorsInOptionsInSQLExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = null;
+    KuduSQLExpressionErrorListener errorListener = null;
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey as intColumn from unittests using Options",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(true,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey as intColumn, 'from' as fgh from " +
+        " unittests using options READ_SNAPSHOT_TIME = aASDAD",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(true,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey, 'from' from unittests using options READ_SNAPSHOT_TIME = 2342345",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey, xcv as fgh from unittests using options CONTROLTUPLE_MESSAGE = \"done\"",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    errorListener = translator.getErrorListener();
+    assertEquals(false,errorListener.isSyntaxError());
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testForSelectStarInSQLExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        " select * from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    assertEquals(true,translator.getKuduSQLParseTreeListener().isSelectStarExpressionEnabled());
+
+    translator = new SQLToKuduPredicatesTranslator(
+      "select intkey as intColumn from unittests ",
+      new ArrayList<ColumnSchema>(columnDefs.values()));
+    assertEquals(false,translator.getKuduSQLParseTreeListener().isSelectStarExpressionEnabled());
+
+  }
+
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  @Test
+  public void testForColumnNameExtractionInSQLExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        " select introwkey as intColumn, '      from' as flColumn, stringCol from unittests",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    assertEquals(1, translator.getKuduSQLParseTreeListener().getListOfColumnsUsed().size());
+    assertEquals(9, translator.getKuduSQLParseTreeListener().getAliases().size());
+    assertEquals("intColumn", translator.getKuduSQLParseTreeListener().getAliases().get("introwkey"));
+  }
+
+  @Test
+  @KuduClusterTestContext(kuduClusterBasedTest = false)
+  public void testForReadSnapshotTimeExpression() throws Exception
+  {
+    SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+        " select introwkey as intColumn using options read_snapshot_time = 12345",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    assertEquals(12345L, translator.getKuduSQLParseTreeListener().getReadSnapshotTime().longValue());
+    SQLToKuduPredicatesTranslator translatorForNoReadSnapshotTime = new SQLToKuduPredicatesTranslator(
+        " select introwkey as intColumn",
+        new ArrayList<ColumnSchema>(columnDefs.values()));
+    assertEquals(null, translatorForNoReadSnapshotTime.getKuduSQLParseTreeListener().getReadSnapshotTime());
+
+  }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java
new file mode 100644
index 0000000..1b771ab
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.BaseKuduOutputOperator;
+import org.apache.apex.malhar.kudu.IncrementalStepScanInputOperator;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+
+/**
+ * A Junit rule that helps in bypassing tests that cannot be done if the kudu cluster is not present.
+ */
+public class KuduClusterAvailabilityTestRule implements TestRule
+{
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(KuduClusterAvailabilityTestRule.class);
+
+  public static final String KUDU_MASTER_HOSTS_STRING_ENV = "kudumasterhosts";
+  @Override
+  public Statement apply(Statement base, Description description)
+  {
+    KuduClusterTestContext testContext = description.getAnnotation(KuduClusterTestContext.class);
+    String masterHostsValue = System.getProperty(KUDU_MASTER_HOSTS_STRING_ENV);
+    boolean runThisTest = true; // default is to run the test if no annotation is specified
+    if ( testContext != null) {
+      if ((masterHostsValue != null) && (testContext.kuduClusterBasedTest())) {
+        runThisTest = true;
+      }
+      if ((masterHostsValue != null) && (!testContext.kuduClusterBasedTest())) {
+        runThisTest = false;
+      }
+      if ((masterHostsValue == null) && (testContext.kuduClusterBasedTest())) {
+        runThisTest = false;
+      }
+      if ((masterHostsValue == null) && (!testContext.kuduClusterBasedTest())) {
+        runThisTest = true;
+      }
+    }
+    if (runThisTest) {
+      // call the before class equivalent for real kudu cluster
+      if ((testContext != null ) && (testContext.kuduClusterBasedTest())) {
+        try {
+          initRealKuduClusterSetup(masterHostsValue);
+        } catch (Exception e) {
+          throw new RuntimeException("Not able to initialize kudu cluster",e);
+        }
+      } else {
+        // call the before class equivalent for mocked kudu cluster
+        try {
+          KuduInputOperatorCommons.initCommonConfigsForAllTypesOfTests();
+        } catch (Exception e) {
+          throw new RuntimeException("Could not initialize commn configs required for Kudu cluster connectivity",e);
+        }
+      }
+      // Run the original test
+      return base;
+    } else {
+      // bypass the test altogether
+      return new Statement()
+      {
+
+        @Override
+        public void evaluate() throws Throwable
+        {
+          // Return an empty Statement object for those tests
+        }
+      };
+    }
+
+  }
+
+  private void initRealKuduClusterSetup(String masterHosts) throws Exception
+  {
+    if (!KuduClientTestCommons.tableInitialized) {
+      synchronized (KuduClientTestCommons.objectForLocking) { // test rigs can be parallelized
+        if (!KuduClientTestCommons.tableInitialized) {
+          KuduClientTestCommons.setKuduMasterAddresses(masterHosts);
+          KuduClientTestCommons.setup();
+          initPropertiesFilesForBaseKuduOutputOperator(masterHosts);
+          initPropertiesFilesForIncrementalStepScanInputOperator(masterHosts);
+          KuduClientTestCommons.tableInitialized = true;
+        }
+      }
+    }
+  }
+
+  private void initPropertiesFilesForBaseKuduOutputOperator(String masterHosts)
+  {
+    BaseKuduOutputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME = rewritePropsFile(
+      masterHosts,BaseKuduOutputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME);
+  }
+
+  private void initPropertiesFilesForIncrementalStepScanInputOperator(String masterHosts)
+  {
+    IncrementalStepScanInputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME = rewritePropsFile(
+      masterHosts,IncrementalStepScanInputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME);
+  }
+
+  /**
+   * Creates a temporary properties file that sets the kudu cluster master hosts value as given in the command line.
+   * @param masterHosts
+   * @param originalFileName File name as would be picked by the class loader
+   * @return
+   */
+  private String rewritePropsFile(String masterHosts, String originalFileName)
+  {
+    Properties kuduConnectionProperties = new Properties();
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    InputStream kuduPropsFileAsStream = loader.getResourceAsStream(originalFileName);
+    File tempPropertiesFile = null;
+    try {
+      tempPropertiesFile = File.createTempFile("kuduclusterconfig-" + System.currentTimeMillis(),
+          ".properties");
+      LOG.info(tempPropertiesFile.getAbsolutePath() + " is being used as a temporary props file");
+    } catch (IOException e) {
+      LOG.error("Not able to create temp file",e);
+      return originalFileName;
+    }
+    if (kuduPropsFileAsStream != null) {
+      try {
+        kuduConnectionProperties.load(kuduPropsFileAsStream);
+        kuduPropsFileAsStream.close();
+        FileOutputStream toBeModifiedFileHandle = new FileOutputStream(tempPropertiesFile);
+        kuduConnectionProperties.put(BaseKuduOutputOperator.MASTER_HOSTS,masterHosts);
+        kuduConnectionProperties.store(toBeModifiedFileHandle,"updated by unit test");
+        toBeModifiedFileHandle.close();
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+    return tempPropertiesFile.getAbsolutePath();
+  }
+
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
similarity index 62%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
index 64b46c6..80ca0af 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
@@ -16,17 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.test;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Annotation that helps in selectively triggering certain tests if Kudu cluster is available at the time of
+ * launching of the tests
  */
-public enum KuduMutationType
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface KuduClusterTestContext
 {
 
-    INSERT,
-    DELETE,
-    UPDATE,
-    UPSERT
+  boolean kuduClusterBasedTest() default false;
+
 }
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
similarity index 70%
copy from contrib/src/test/resources/kuduoutputoperator.properties
copy to kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
index 8f41c63..ddf5068 100644
--- a/contrib/src/test/resources/kuduoutputoperator.properties
+++ b/kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
@@ -17,6 +17,11 @@
 # under the License.
 #
 
-masterhosts=192.168.2.141:7051,192.168.2.133:7051
+masterhosts=192.168.1.41:7051
 tablename=unittests
-pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file
+templatequerystring=SELECT * unittests WHERE introwkey > 1234 AND timestamprowkey >= :lowerbound AND timestamprowkey< :upperbound
+templatequerylowerboundparamname = :lowerbound
+templatequeryupperboundparamname = :upperbound
+templatequeryseedvalue = 1503001033219
+templatequeryincrementalvalue = 120000
+
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/kudu/src/test/resources/kuduoutputoperator.properties
similarity index 87%
rename from contrib/src/test/resources/kuduoutputoperator.properties
rename to kudu/src/test/resources/kuduoutputoperator.properties
index 8f41c63..1c61d38 100644
--- a/contrib/src/test/resources/kuduoutputoperator.properties
+++ b/kudu/src/test/resources/kuduoutputoperator.properties
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-masterhosts=192.168.2.141:7051,192.168.2.133:7051
+masterhosts=192.168.1.41:7051
 tablename=unittests
-pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file
+pojoclassname=org.apache.apex.malhar.kudu.UnitTestTablePojo
diff --git a/kudu/src/test/resources/log4j.properties b/kudu/src/test/resources/log4j.properties
new file mode 100644
index 0000000..972d14d
--- /dev/null
+++ b/kudu/src/test/resources/log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.kudu=WARN
+log4j.logger.org.apache.apex.malhar.kudu=DEBUG
diff --git a/pom.xml b/pom.xml
index 298576d..d824587 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,6 +214,7 @@
     <module>library</module>
     <module>contrib</module>
     <module>kafka</module>
+    <module>kudu</module>
     <module>examples</module>
   </modules>
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].