You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/09/27 14:56:29 UTC
[apex-malhar] branch master updated:
APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing
review comments by Thomas, Vlad and Pramod
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 20c407b APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing review comments by Thomas,Vlad and Pramod
20c407b is described below
commit 20c407b145a063f7d541164bde732fbd7fd68a77
Author: Ananth <an...@gmail.com>
AuthorDate: Tue Sep 26 04:14:17 2017 +1000
APEXMALHAR-2472.KuduInputOperator Initial implementation and addressing review comments by Thomas,Vlad and Pramod
---
contrib/pom.xml | 6 -
kudu/pom.xml | 168 ++++
.../malhar/kudu/sqlparser/KuduSQLExpression.g4 | 169 ++++
.../malhar/kudu/AbstractKuduInputOperator.java | 1035 ++++++++++++++++++++
.../malhar}/kudu/AbstractKuduOutputOperator.java | 17 +-
.../apex/malhar}/kudu/ApexKuduConnection.java | 22 +-
.../apex/malhar}/kudu/BaseKuduOutputOperator.java | 8 +-
.../kudu/IncrementalStepScanInputOperator.java | 261 +++++
.../malhar/kudu/InputOperatorControlTuple.java | 81 ++
.../apex/malhar}/kudu/KuduExecutionContext.java | 2 +-
.../apache/apex/malhar}/kudu/KuduMutationType.java | 2 +-
.../org/apache/apex/malhar/kudu/package-info.java | 21 +-
.../partitioner/AbstractKuduInputPartitioner.java | 251 +++++
.../kudu/partitioner/KuduOneToManyPartitioner.java | 86 ++
.../kudu/partitioner/KuduOneToOnePartitioner.java | 71 ++
.../partitioner/KuduPartitionScanStrategy.java | 14 +-
.../apex/malhar/kudu/partitioner/package-info.java | 20 +-
.../kudu/scanner/AbstractKuduPartitionScanner.java | 242 +++++
.../KuduPartitionConsistentOrderScanner.java | 107 ++
.../scanner/KuduPartitionRandomOrderScanner.java | 67 ++
.../scanner/KuduPartitionScanAssignmentMeta.java | 120 +++
.../kudu/scanner/KuduPartitionScannerCallable.java | 204 ++++
.../malhar/kudu/scanner/KuduRecordWithMeta.java | 88 ++
.../malhar/kudu/scanner/KuduScanOrderStrategy.java | 14 +-
.../apex/malhar/kudu/scanner/package-info.java | 22 +-
.../KuduSQLExpressionErrorListener.java | 65 ++
.../sqltranslator/KuduSQLParseTreeListener.java | 601 ++++++++++++
.../malhar/kudu/sqltranslator/KuduSQLParser.java | 49 +
.../SQLToKuduPredicatesTranslator.java | 129 +++
.../malhar/kudu/sqltranslator/package-info.java | 21 +-
.../malhar/kudu/AbstractKuduInputOperatorTest.java | 86 ++
.../kudu/IncrementalStepScanInputOperatorTest.java | 117 +++
.../apex/malhar/kudu/KuduClientTestCommons.java | 260 +++++
.../KuduCreateUpdateDeleteOutputOperatorTest.java | 154 +--
.../apex/malhar/kudu/KuduInputOperatorCommons.java | 212 ++++
.../malhar}/kudu/SimpleKuduOutputOperator.java | 12 +-
.../kudu/UnitTestStepwiseScanInputOperator.java | 32 +-
.../apex/malhar}/kudu/UnitTestTablePojo.java | 18 +-
.../AbstractKuduInputPartitionerTest.java | 113 +++
.../partitioner/KuduOneToManyPartitionerTest.java | 82 ++
.../partitioner/KuduOneToOnePartitionerTest.java | 76 ++
.../scanner/AbstractKuduPartitionScannerTest.java | 98 ++
.../scanner/KuduPartitionScannerCallableTest.java | 133 +++
.../SQLToKuduPredicatesTranslatorTest.java | 156 +++
.../kudu/test/KuduClusterAvailabilityTestRule.java | 162 +++
.../malhar/kudu/test/KuduClusterTestContext.java | 21 +-
...kuduincrementalstepscaninputoperator.properties | 9 +-
.../test/resources/kuduoutputoperator.properties | 4 +-
kudu/src/test/resources/log4j.properties | 45 +
pom.xml | 1 +
50 files changed, 5499 insertions(+), 255 deletions(-)
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 3c5797c..1434c49 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -672,11 +672,5 @@
<artifactId>jackson-databind</artifactId>
<version>2.7.0</version>
</dependency>
- <dependency>
- <groupId>org.apache.kudu</groupId>
- <artifactId>kudu-client</artifactId>
- <version>1.3.0</version>
- <optional>true</optional>
- </dependency>
</dependencies>
</project>
diff --git a/kudu/pom.xml b/kudu/pom.xml
new file mode 100755
index 0000000..2b518ae
--- /dev/null
+++ b/kudu/pom.xml
@@ -0,0 +1,168 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-kudu</artifactId>
+ <name>Apache Apex Malhar Kudu Support</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <checkstyle.console>false</checkstyle.console>
+ <kudu-client-version>1.5.0</kudu-client-version>
+ <disruptor-queue-conversant-media-version>1.2.10</disruptor-queue-conversant-media-version>
+ <antlr4-runtime-version>4.7</antlr4-runtime-version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>4.7</version>
+ <executions>
+ <execution>
+ <id>kudusqlantlr4parsergen</id>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
+ <excludes>**/KuduSQLExpression*</excludes> <!--To account for violations from the Antlr autogenerated code -->
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.tokens</exclude>
+ <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionLexer.tokens</exclude>
+ <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionLexer.java</exclude>
+ <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionListener.java</exclude>
+ <exclude>src/main/java/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpressionParser.java</exclude>
+ <exclude>src/main/antlr4/KuduSQLExpression.g4</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${basedir}/target/generated-sources/antlr4</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ <type>jar</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>${antlr4-runtime-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.conversantmedia</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor-queue-conversant-media-version}</version>
+ <classifier>jdk7</classifier><!-- Set classifier to jdk8 when malhar switches to JDK 8 -->
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.8.47</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-junit</artifactId>
+ <version>2.0.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4 b/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4
new file mode 100644
index 0000000..ec3ea79
--- /dev/null
+++ b/kudu/src/main/antlr4/org/apache/apex/malhar/kudu/sqlparser/KuduSQLExpression.g4
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+grammar KuduSQLExpression;
+
+kudusqlexpression: (WHITESPACE)* SELECT (WHITESPACE)+ columnnamesselect (tableclause)? (whereclause)? (endstatement)? (withoptionsclause)? (WHITESPACE)*;
+
+columnnamesselect : '*' # ALL_COLUMNS_SELECT_EXP
+ | colnameselectexpression # COLNAME_BASED_SELECT_EXP
+ ;
+
+colnameselectexpression : colnameselectexpression (WHITESPACE)* COMMA (WHITESPACE)* colnameselectexpression # SELECT_COMPLEX_COL_EXPRESSION
+ | idorcolumnname (WHITESPACE)+ AS (WHITESPACE)+ ID # SELECT_ALIAS_USED
+ | idorcolumnname # SELECT_ID_ONLY_USED_AS_COLUMN_NAME
+ ;
+
+idorcolumnname: ID
+ | '\'' (WHITESPACE)* FROM (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* WHERE (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* USING (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* OPTIONS (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* NOT (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* NULL (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* CONTROLTUPLE_MESSAGE (WHITESPACE)* '\''
+ | '\'' (WHITESPACE)* READ_SNAPSHOT_TIME (WHITESPACE)* '\''
+ ;
+
+tableclause : (WHITESPACE)+ FROM (WHITESPACE)+ ID;
+
+whereclause : (WHITESPACE)+ WHERE (WHITESPACE)+ columnfilterexpression;
+
+withoptionsclause: (WHITESPACE)+ USING (WHITESPACE)+ OPTIONS (WHITESPACE)+ keyvaluepair;
+
+
+keyvaluepair:
+ READ_SNAPSHOT_TIME (WHITESPACE)* EQUAL_TO (WHITESPACE)* INT #SET_READ_SNAPSHOT_TIME
+ | CONTROLTUPLE_MESSAGE (WHITESPACE)* EQUAL_TO (WHITESPACE)* STRINGVAL #SET_CONTROL_TUPLE_MSG
+ | keyvaluepair (WHITESPACE)+ COMMA (WHITESPACE)* keyvaluepair #SET_MULTI_OPTIONS
+ ;
+
+columnfilterexpression: columnfilterexpression (WHITESPACE)* AND (WHITESPACE)* columnfilterexpression #FILTER_COMPLEX_FILTER_EXP
+ | idorcolumnname (WHITESPACE)* comparisionoperator (WHITESPACE)* anyvalue #FILTER_COMPARISION_EXP
+ | idorcolumnname (WHITESPACE)+ IS (WHITESPACE)+ NOT (WHITESPACE)+ NULL #IS_NOT_NULL_FILTER_EXP
+ | idorcolumnname (WHITESPACE)+ IS (WHITESPACE)+ NULL #IS_NULL_FILTER_EXP
+ | idorcolumnname (WHITESPACE)+ IN (WHITESPACE)+ listofanyvalue #IN_FILTER_EXP
+ ;
+
+
+comparisionoperator: EQUAL_TO
+ | LESSER_THAN
+ | LESSER_THAN_OR_EQUAL
+ | GREATER_THAN
+ | GREATER_THAN_OR_EQUAL
+ ;
+anyvalue: bool
+ | numval
+ | doubleval
+ | floatval
+ | stringval
+ ;
+
+bool: TRUE
+ | FALSE;
+
+numval: INT;
+
+doubleval: INT '.' INT 'd'
+ | '.' INT 'd'
+ ;
+
+floatval: INT '.' INT 'f'
+ | '.' INT 'f'
+ ;
+
+stringval: STRINGVAL;
+
+listofanyvalue: listofbools
+ | listofnums
+ | listoffloats
+ | listofdoubles
+ | listofstrings
+
+ ;
+
+listofbools: LPAREN (WHITESPACE)* bool (WHITESPACE)* (COMMA (WHITESPACE)* bool)* (WHITESPACE)* RPAREN;
+listofnums: LPAREN (WHITESPACE)* numval (WHITESPACE)* (COMMA (WHITESPACE)* numval)* (WHITESPACE)* RPAREN;
+listoffloats: LPAREN (WHITESPACE)* floatval (WHITESPACE)* (COMMA (WHITESPACE)* floatval)* (WHITESPACE)* RPAREN;
+listofdoubles: LPAREN (WHITESPACE)* doubleval (WHITESPACE)* (COMMA (WHITESPACE)* doubleval)* (WHITESPACE)* RPAREN;
+listofstrings: LPAREN (WHITESPACE)* stringval (WHITESPACE)* (COMMA (WHITESPACE)* stringval)* (WHITESPACE)* RPAREN;
+
+endstatement: SEMICOLON;
+
+SELECT: [Ss][Ee][Ll][Ee][Cc][Tt];
+
+AS: [Aa][Ss];
+
+FROM: [Ff][Rr][Oo][Mm];
+
+WHERE: [Ww][[Hh][Ee][Rr][Ee];
+
+NOT: [Nn][Oo][Tt];
+
+NULL: [Nn][Uu][Ll] [Ll];
+
+IN: [Ii][Nn];
+
+IS: [Ii][Ss];
+
+AND: [Aa][Nn][Dd];
+
+TRUE: [Tt][Rr][Uu][Ee];
+
+FALSE: [Ff][Aa][Ll][Ss][Ee];
+
+COMMA: ',';
+
+SEMICOLON: ';';
+
+EQUAL_TO: '=';
+
+GREATER_THAN: '>';
+
+LESSER_THAN: '<';
+
+GREATER_THAN_OR_EQUAL: '>=';
+
+LESSER_THAN_OR_EQUAL: '<=';
+
+LPAREN: '[';
+
+RPAREN: ']';
+
+USING: [Uu][Ss][Ii][Nn][Gg];
+
+OPTIONS: [Oo][Pp][Tt][Ii][Oo][Nn][Ss];
+
+READ_SNAPSHOT_TIME: [Rr][Ee][Aa][Dd] '_' [Ss][Nn][Aa][Pp][Ss][Hh][Oo][Tt]'_'[Tt][Ii][Mm][Ee];
+
+CONTROLTUPLE_MESSAGE: [Cc][Oo][Nn][Tt][Rr][Oo][Ll][Tt][Uu][Pp][Ll][Ee]'_'[Mm][Ee][Ss][Ss][Aa][Gg][Ee];
+
+ID: ('a'..'z' | 'A'..'Z' | '_') ('a'..'z' | 'A'..'Z' | '_' | '0'..'9')*;
+
+INT: '0' .. '9'+
+ ;
+
+WHITESPACE : ('\t' | ' ' | '\r' | '\n'| '\u000C');
+
+STRINGVAL: '"' (ESC|.)*? '"';
+
+fragment
+ESC: '\\"' | '\\\\' ;
+
+
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java
new file mode 100644
index 0000000..14ea6b7
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperator.java
@@ -0,0 +1,1035 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultOutputPort;
+import org.apache.apex.malhar.kudu.partitioner.AbstractKuduInputPartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduOneToManyPartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduOneToOnePartitioner;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionConsistentOrderScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionRandomOrderScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.scanner.KuduRecordWithMeta;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.KuduSQLParseTreeListener;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduTable;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>Abstract Kudu Input Operator that can be used to stream POJO representation of a Kudu table row by providing a
+ * SQL expression as an input to the operator. The SQL expression is fed into the operator by implementing the
+ * {@link AbstractKuduInputOperator#getNextQuery()}. The query processing is a continuous process wherein the
+ * method is invoked as soon as the operator is done processing the current query</p>
+ *
+ * <p>
+ * The following are the main features of the Kudu input operator
+ * <ol>
+ * <li>The scans are performed using a SQL expression. The SQL expression is limited to the constructs
+ * provided by the Kudu scan engine. As an example not equal to expressions are not allowed as predicates</li>
+ * <li>The SQL expression allows for scanning with options that allow for setting the read snap shot time
+ * and/or setting the message that will be sent as part of the control tuple after the query is sent to the
+ * downstream operators</li>
+ * <li>The operator allows for a custom control tuple to be sent when the query data is streamed to the downstream
+ * operators</li>
+ * <li>The operator is templatised.
+ * <ol>
+ * <li>The first template variable represents the POJO class that represents the single
+ * row of a kudu table</li>
+ * <li>The second template parameter represents the control tuple message that is sent to downstream operators
+ * once a query is complete.</li>
+ * </ol>
+ * </li>
+ * <li>
+ * The input operator can be configured as a fault tolerant input operator. Configuring for fault tolerance
+ * implies that the scan is slower but resilient for tablet server failures. See
+ * {@link AbstractKuduInputOperator#setFaultTolerantScanner(boolean)}</li>
+ * </li>
+ * <li>The number of partitions can be set via typical apex configuration or via the method
+ * {@link AbstractKuduInputOperator#setNumberOfPartitions(int)}. The configuration value is overridden by the
+ * API method if both are defined.</li>
+ * <li>
+ * A partition scan strategy can be configured. There are two types os supported scan partitioners.Default is
+ * MANY_TABLETS_PER_OPERATOR.
+ * <ol>
+ * <li>MANY_TABLETS_PER_OPERATOR: Many kudu tablets are mapped to one Apex parition while scanning. See
+ * {@link KuduOneToManyPartitioner}</li>
+ * <li>ONE_TABLET_PER_OPERATOR: One Kudu tablet to one Apex partition. See
+ * {@link KuduOneToOnePartitioner}</li>
+ * </ol>
+ * </li>
+ * <li>A scan order strategy can also be set. This configuration allows for two types of ordering. The default
+ * is RANDOM_ORDER_SCANNER.
+ * <ol>
+ * <li>CONSISTENT_ORDER_SCANNER: Ensures the scans are consistent across checkpoint and restarts. See
+ * {@link KuduPartitionConsistentOrderScanner}</li>
+ * <li>RANDOM_ORDER_SCANNER: Rows from different kudu tablets are streamed in parallel in different threads.
+ * Choose this option for the highest throughput but with the downside of no ordering guarantees across
+ * multiple launches of the operator. See {@link KuduPartitionRandomOrderScanner}</li>
+ * </ol>
+ * </li>
+ * <li>Reads from Kudu tablets happen on a different thread than the main operator thread. The operator uses the
+ * DisruptorBlockingQueue as the buffer to achieve optimal performance and high throughput. The data scanned is
+ * sent via this buffer to the kudu input operator. Depending on the configuration, there can be multiple threads
+ * scanning multiple kudu tablets in parallel. The input operator allows to fine tune the buffering strategy.
+ * Set the CPU policy to SPINNING to
+ * get the highest performance. However this setting results in showing a 100% CPU busy state on the host where
+ * it is spinning instead of doing any useful work until there is data processing required</li>
+ * <li>The Operator allows for exactly once semantics when resuming from a checkpoint after a crash recovery/
+ * restart. For the exactly once semantics, the operator developer who is extending the Abstract input
+ * operator will need to override the method
+ * {@link AbstractKuduInputOperator#isAllowedInReconcilingWindow(KuduRecordWithMeta)}. Return true if the
+ * row needs to be streamed downstream. Note that exactly once semantics requires a lot more
+ * complexity in a truly distributed framework. Note that this method is called only for one window when
+ * the operator is in a reconciling mode in the last window that it was operating in just before a crash
+ * or a shutdown of the application.</li>
+ * <li>The query processing is done by operator one query after another and is independent of the checkpoint
+ * boundary. This essentially means that any given instance of time all physical instances of the operator
+ * are not processing the same query. Downstream operators if using a unifier need to be aware of this. The
+ * control tuple sent downstream can be used to identify the boundaries of the operator query processing
+ * boundaries.
+ * {@link AbstractKuduInputOperator#getNextQuery()} method is used by the child classes of this operator to
+ * define the next SQL expression that needs to be processed.</li>
+ * <li>Connection to the Kudu cluster is managed by specifying a
+ * {@link ApexKuduConnection.ApexKuduConnectionBuilder)}. Use this builder to fine tune various connection configs
+ * to ensure that the optimal resources are set for the scanner</li>
+ * <li>The tuple throughput can be controlled by using the
+ * {@link AbstractKuduInputOperator#setMaxTuplesPerWindow(int)} to control the rate at which the
+ * tuples are emitted in any given window</li>
+ * <li>Note that the default window data manager is the FS based window data manger. Changing this to Noop or
+ * other dummy implementations would result in 'EXACTLY ONCE' semantics void.</li>
+ * </ol>
+ * </p>
+ * @since 3.8.0
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractKuduInputOperator<T,C extends InputOperatorControlTuple> implements InputOperator,
+ Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
+ Partitioner<AbstractKuduInputOperator>, StatsListener
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduInputOperator.class);
+
+ private KuduPartitionScanStrategy partitionScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+
+ private KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+ protected ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionInfo;
+
+ /** Represents the PIE of the total tablets that are distributed across all instances of the operator. Note that
+ * this <b>DOES NOT</b> represent the work distribution for a given query for reasons related to churn of
+ * partitioning. Any query that is processed is assigned is alloted a subset of this pie with the possibility of
+ * all the query utilizing all of the pie for computing the query.
+ */
+ private List<KuduPartitionScanAssignmentMeta> partitionPieAssignment = new ArrayList<>();
+
+ protected Class<T> clazzForResultObject;
+
+ private int numberOfPartitions = -1; // set to -1 to see in partition logic to see if user explicitly set this value
+
+ protected String tableName;
+
+ // performance fine tuning params
+
+ /**
+ * Set this to false to increase throughput but at the risk of being Kudu tablet failure dependent
+ */
+ private boolean isFaultTolerantScanner = true;
+
+ private SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+ private int bufferCapacity = 8192;
+
+ private int maxTuplesPerWindow = -1;
+
+
+ // current query fields
+ private String currentQueryBeingProcessed;
+
+ private Map<String,String> optionsEnabledForCurrentQuery;
+
+ /**
+ * Used to track the number of scantokens that are valid for this operator instance and this query. Used
+ * to track the current query processing status and whether the operator is ready to accept the next query.
+ */
+ private int plannedSegmentsForCurrentQuery = 0;
+
+ /**
+ * Used to track the completion status of each scantoken Kudu partition that was assigned for this query and this
+ * physical instance of the operator.
+ */
+ private Map<KuduPartitionScanAssignmentMeta,Boolean> currentQueryCompletionStatus = new HashMap<>();
+
+ private boolean allScansCompleteForThisQuery = true;
+
+ private boolean isPartitioned = false;
+
+ @NotNull
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+
+ private transient long currentWindowId;
+
+ private transient long reconcilingPhaseWindowId;
+
+ private transient boolean isCurrentlyInSafeMode;
+
+ private transient boolean isCurrentlyInReconcilingMode;
+
+ private transient Map<KuduPartitionScanAssignmentMeta,Long> windowManagerDataForScans;
+
+ @JsonIgnore
+ protected transient AbstractKuduInputPartitioner partitioner;
+
+ @JsonIgnore
+ protected transient AbstractKuduPartitionScanner<T,C> scanner;
+
+ private transient DisruptorBlockingQueue<KuduRecordWithMeta<T>> buffer;
+
+ private transient Map<String,String> kuduColNameToPOJOFieldNameMap;
+
+ private transient Map<String,ColumnSchema> kuduColNameToSchemaMapping;
+
+ private transient int currentWindowTupleCount = 0;
+
+ public final transient ControlAwareDefaultOutputPort<T> outputPort = new ControlAwareDefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<String> errorPort = new DefaultOutputPort<>();
+
+ public AbstractKuduInputOperator()
+ {
+ // Used by the Kryo cloning process.
+ // Other usages would need to set the table name, class and Kudu connection using setters.
+ // Kryo would take care of these values as it does a deep copy.
+ }
+
+ public AbstractKuduInputOperator(ApexKuduConnection.ApexKuduConnectionBuilder kuduConnectionInfo,
+ Class<T> clazzForPOJO) throws Exception
+ {
+ checkNotNull(clazzForPOJO," Class definition for POJO cannot be null");
+ checkNotNull(kuduConnectionInfo, "Kudu connection info cannot be null");
+ apexKuduConnectionInfo = kuduConnectionInfo;
+ clazzForResultObject = clazzForPOJO;
+ tableName = kuduConnectionInfo.tableName;
+ }
+
+ /**
+ * Implement this method to get the next query that needs to be processed by this instance of the operator. Note
+ * that the given query is distributed across all physical instances of the operator. It is entirely possible that
+ * each physical instance of the operator is processing a different query at any given instance of time because
+ * the rows that need to be scanned might not be equally spread across all kudu partitions.
+ * As long as all instances of the physical operator get the same sequence of queries as part of the implementation
+ * of this method, all the physical operator instances will ensure the same sequence of processing of the queries.
+ * @return The next query string that needs to be processed.
+ */
+ protected abstract String getNextQuery();
+
+
+ /**
+ * Implements the logic to process a given query. Note that this method is invoked from two states.
+ * One state when the Operator is resuming from a checkpointed state and the second when a new query needs to be
+ * processed as part of the normal execution processing.
+ */
+ protected boolean processForQueryString(String queryExpression)
+ {
+ LOG.info("Processing query " + queryExpression);
+ SQLToKuduPredicatesTranslator parsedQuery = null;
+ try {
+ parsedQuery = new SQLToKuduPredicatesTranslator(
+ queryExpression,new ArrayList<ColumnSchema>(kuduColNameToSchemaMapping.values()));
+ parsedQuery.parseKuduExpression();
+ } catch (Exception e) {
+ LOG.error("Could not parse the SQL expression/query " + e.getMessage(),e);
+ errorPort.emit(queryExpression);
+ return false;
+ }
+ if (parsedQuery.getErrorListener().isSyntaxError()) {
+ LOG.error(" Query is an invalid query " + queryExpression);
+ errorPort.emit(queryExpression);
+ return false;
+ }
+ if (!parsedQuery.getKuduSQLParseTreeListener().isSuccessfullyParsed()) {
+ LOG.error(" Query could not be successfully parsed instead of being syntactically correct " + queryExpression);
+ errorPort.emit(queryExpression);
+ return false;
+ }
+ Map<String,Object> setters = extractSettersForResultObject(parsedQuery);
+ try {
+ currentQueryBeingProcessed = queryExpression;
+ optionsEnabledForCurrentQuery.clear();
+ optionsEnabledForCurrentQuery.putAll(parsedQuery.getKuduSQLParseTreeListener().getOptionsUsed());
+ plannedSegmentsForCurrentQuery = scanner.scanAllRecords(parsedQuery,setters);
+ } catch (IOException e) {
+ LOG.error("Error while scanning the kudu segments " + e.getMessage(), e);
+ errorPort.emit(queryExpression);
+ return false;
+ }
+ LOG.info("Query" + queryExpression + " submitted for scanning");
+ return true;
+ }
+
+ /**
+ * Resets the processing state of data structures that are applicable at a query level.
+ */
+ protected boolean processForNextQuery()
+ {
+ LOG.info("Clearing data structures for processing query " + currentQueryBeingProcessed);
+ windowManagerDataForScans.clear();
+ currentQueryCompletionStatus.clear();
+ return processForQueryString(getNextQuery());
+ }
+
+
+ /**
+ * Scans the metadata for the kudu table that this operator is scanning for and
+ * returns back the mapping for the kudu column name to the ColumnSchema metadata definition.
+ * Note that the Kudu columns names are case sensitive.
+ * @return A Map with Kudu column names as keys and value as the Column Definition.
+ * @throws Exception
+ */
+ private Map<String,ColumnSchema> buildColumnSchemaForTable() throws Exception
+ {
+ if (kuduColNameToSchemaMapping == null) {
+ ApexKuduConnection connectionForMetaDataScan = apexKuduConnectionInfo.build();
+ KuduTable table = connectionForMetaDataScan.getKuduTable();
+ List<ColumnSchema> tableColumns = table.getSchema().getColumns();
+ connectionForMetaDataScan.close();
+ Map<String,ColumnSchema> columnSchemaMap = new HashMap<>();
+ for (ColumnSchema aColumn: tableColumns) {
+ columnSchemaMap.put(aColumn.getName(),aColumn);
+ }
+ kuduColNameToSchemaMapping = columnSchemaMap;
+ }
+ return kuduColNameToSchemaMapping;
+ }
+
+ /**
+ * Method responsible for sending a control tuple whenever a new query is about to be processed.
+ */
+ protected void sendControlTupleForNewQuery()
+ {
+ C startNewQueryControlTuple = getControlTupleForNewQuery();
+ if (startNewQueryControlTuple != null) {
+ outputPort.emitControl(startNewQueryControlTuple);
+ }
+ }
+
+ /**
+ * Override this method to send a valid Control tuple to the downstream operator instances
+ * @return The control tuple that needs to be sent at the beginning of the query
+ */
+ protected C getControlTupleForNewQuery()
+ {
+ return null;// default is null; Can be overridden in the child classes if custom representation is needed
+ }
+
+ /**
+ * Fetches values form the buffer. This also checks for the limit set as the maximum tuples that can be
+ * sent downstream in a single window.
+ * @return Returns the next Kudu row along with its metadata that needs to be processed for filters
+ */
+
+ protected KuduRecordWithMeta<T> processNextTuple()
+ {
+ boolean emitTuple = true;
+ KuduRecordWithMeta<T> entryFetchedFromBuffer = null;
+ if (maxTuplesPerWindow != -1) {
+ // we have an explicit upper window. Hence we need to check and then emit
+ if (currentWindowTupleCount >= maxTuplesPerWindow) {
+ emitTuple = false;
+ }
+ }
+ if ( emitTuple) {
+ try {
+ entryFetchedFromBuffer = buffer.poll(100L, TimeUnit.MICROSECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug("No entry available in the buffer " + e.getMessage(), e);
+ }
+ }
+ return entryFetchedFromBuffer;
+ }
+
+ /***
+ * Used to filter of tuples read from the Kudu scan if it is being replayed in the windows before the
+ * reconciling window phase. THis method also invokes the filter check in case the current window is in the
+ * reconciling window phase
+ * @param recordWithMeta
+ */
+ protected void filterTupleBasedOnCurrentState(KuduRecordWithMeta<T> recordWithMeta)
+ {
+ boolean filter = false;
+ if (isCurrentlyInSafeMode) {
+ filter = true;
+ }
+ KuduPartitionScanAssignmentMeta currentRecordMeta = recordWithMeta.getTabletMetadata();
+ long currentPositionInScan = recordWithMeta.getPositionInScan();
+ if (windowManagerDataForScans.containsKey(currentRecordMeta)) {
+ long counterLimitForThisMeta = windowManagerDataForScans.get(currentRecordMeta);
+ if ( currentPositionInScan <= counterLimitForThisMeta) {
+ // This is the case of a replay and hence do not emit
+ filter = true;
+ } else {
+ windowManagerDataForScans.put(currentRecordMeta,currentPositionInScan);
+ }
+ } else {
+ windowManagerDataForScans.put(currentRecordMeta,currentPositionInScan);
+ }
+ if ( isCurrentlyInReconcilingMode) {
+ // check to see if we can emit based on the buisness logic in a reconciling window processing state
+ if ( !isAllowedInReconcilingWindow(recordWithMeta)) {
+ filter = true;
+ }
+ }
+ if (!filter) {
+ outputPort.emit(recordWithMeta.getThePayload());
+ currentWindowTupleCount += 1;
+ }
+ }
+
+ /**
+ * Sends either control tuples or data tuples basing on the values being fetched from the buffer.
+ */
+ @Override
+ public void emitTuples()
+ {
+ if (allScansCompleteForThisQuery) {
+ if (processForNextQuery() ) {
+ sendControlTupleForNewQuery();
+ allScansCompleteForThisQuery = false;
+ }
+ return;
+ }
+ KuduRecordWithMeta<T> entryFetchedFromBuffer = processNextTuple();
+ if (entryFetchedFromBuffer != null) {
+ if ( (!entryFetchedFromBuffer.isEndOfScanMarker()) &&
+ (!entryFetchedFromBuffer.isBeginScanMarker())) {
+ // we have a valid record
+ filterTupleBasedOnCurrentState(entryFetchedFromBuffer);
+ return;
+ }
+ if (entryFetchedFromBuffer.isEndOfScanMarker()) {
+ processForEndScanMarker(entryFetchedFromBuffer);
+ sendControlTupleForEndQuery();
+ }
+ if (entryFetchedFromBuffer.isBeginScanMarker()) {
+ processForBeginScanMarker(entryFetchedFromBuffer);
+ }
+ }
+ }
+
+ /**
+ * Sends a control tuple to the downstream operators. This method checks to see if the user wishes to send a custom
+ * control tuple. If yes, a control tuple that the user chooses to send is streamed to the downstream operators.
+ * See {@link AbstractKuduInputOperator#getControlTupleForEndQuery()}. If this is null but the user configured a
+ * control tuple to be sent as part of the SQL expression that is given as input, a control tuple will automatically
+ * be generated by this operator containing the message that the user set at the time of the Query definition.
+ */
+ protected void sendControlTupleForEndQuery()
+ {
+ if ( currentQueryBeingProcessed == null) {
+ return; //
+ }
+ if (!allScansCompleteForThisQuery) {
+ return;
+ }
+ C endControlTuple = getControlTupleForEndQuery();
+ if (endControlTuple != null) {
+ outputPort.emitControl(endControlTuple);
+ } else {
+ if (optionsEnabledForCurrentQuery.containsKey(KuduSQLParseTreeListener.CONTROLTUPLE_MESSAGE)) {
+ InputOperatorControlTuple controlTuple = new InputOperatorControlTuple();
+ controlTuple.setBeginNewQuery(false);
+ controlTuple.setEndCurrentQuery(true);
+ controlTuple.setQuery(currentQueryBeingProcessed);
+ controlTuple.setControlMessage(
+ optionsEnabledForCurrentQuery.get(KuduSQLParseTreeListener.CONTROLTUPLE_MESSAGE));
+ outputPort.emitControl(controlTuple);
+ }
+ }
+ }
+
+ protected C getControlTupleForEndQuery()
+ {
+ return null;// default is null; Can be overridden in the child classes if custom representation is needed
+ }
+
+ protected void processForBeginScanMarker(KuduRecordWithMeta<T> entryFetchedFromBuffer)
+ {
+ currentQueryCompletionStatus.put(entryFetchedFromBuffer.getTabletMetadata(), false);
+ }
+
+ /**
+ * Used to see if all the segments that are planned for the current query and the current physical instance of the
+ * operator are done in terms of streaming tuples to the downstream operators.
+ * @param entryFetchedFromBuffer
+ */
+ protected void processForEndScanMarker(KuduRecordWithMeta<T> entryFetchedFromBuffer)
+ {
+ Boolean currentStatus = currentQueryCompletionStatus.get(entryFetchedFromBuffer.getTabletMetadata());
+ if (currentStatus == null) {
+ LOG.error(" End scan marker cannot be precede a Begin Scan marker ");
+ }
+ currentQueryCompletionStatus.put(entryFetchedFromBuffer.getTabletMetadata(), true);
+ if ( plannedSegmentsForCurrentQuery == 0 ) {
+ allScansCompleteForThisQuery = true;
+ return;
+ }
+ boolean areAllScansComplete = true;
+ if (currentQueryCompletionStatus.size() != plannedSegmentsForCurrentQuery) {
+ return;
+ }
+ for (KuduPartitionScanAssignmentMeta aMeta: currentQueryCompletionStatus.keySet()) {
+ if (!currentQueryCompletionStatus.get(aMeta)) {
+ areAllScansComplete = false;
+ }
+ }
+ if (areAllScansComplete) {
+ allScansCompleteForThisQuery = true;
+ }
+ }
+
+ /**
+ * This method is used to give control the user of the operator to decide how to derive exactly once
+ * semantics. Note that this check is invoked in a reconciling window when the operator is resuming from a crash.
+ * This check happens only in this window and in no other window.
+ * @param extractedRecord
+ * @return true if the tuple needs to be sent to downstream. False if one does not want to stream this tuple
+ * downstream. Note that this check is invoked in a reconciling window when the operator is resuming from a crash.
+ */
+ protected boolean isAllowedInReconcilingWindow(KuduRecordWithMeta<T> extractedRecord)
+ {
+ return true;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowTupleCount = 0;
+ currentWindowId = windowId;
+ if ( currentWindowId != Stateless.WINDOW_ID) {
+ if (currentWindowId > reconcilingPhaseWindowId) {
+ isCurrentlyInSafeMode = false;
+ isCurrentlyInReconcilingMode = false;
+ }
+ if (currentWindowId == reconcilingPhaseWindowId) {
+ isCurrentlyInReconcilingMode = true;
+ isCurrentlyInSafeMode = false;
+ }
+ if (currentWindowId < reconcilingPhaseWindowId) {
+ isCurrentlyInReconcilingMode = false;
+ isCurrentlyInSafeMode = true;
+ }
+ }
+ LOG.info(" Current processing mode states Safe Mode = " + isCurrentlyInSafeMode + " Reconciling mode = "
+ + isCurrentlyInReconcilingMode);
+ LOG.info(" Current window ID = " + currentWindowId + " reconciling window ID = " + reconcilingPhaseWindowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ try {
+ windowDataManager.save(windowManagerDataForScans,currentWindowId);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not persist the window Data manager on end window boundary " +
+ e.getMessage(),e);
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (windowDataManager != null) {
+ windowDataManager.setup(context);
+ }
+ try {
+ buildColumnSchemaForTable();
+ } catch (Exception e) {
+ throw new RuntimeException("Error while trying to build the schema definition for the Kudu table " + tableName,
+ e);
+ }
+ windowManagerDataForScans = new HashMap<>();
+ optionsEnabledForCurrentQuery = new HashMap<>();
+ initPartitioner();
+ initBuffer();
+ // Scanner can only be initialized after initializing the partitioner
+ initScanner(); // note that this is not streaming any data yet. Only warming up the scanner ready to read data
+ initCurrentState();
+ isCurrentlyInSafeMode = false;
+ isCurrentlyInReconcilingMode = false;
+ reconcilingPhaseWindowId = Stateless.WINDOW_ID;
+ if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) &&
+ context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
+ windowDataManager.getLargestCompletedWindow()) {
+ isCurrentlyInSafeMode = true;
+ reconcilingPhaseWindowId = windowDataManager.getLargestCompletedWindow() + 1;
+ LOG.info("Set reconciling window ID as " + reconcilingPhaseWindowId);
+ isCurrentlyInReconcilingMode = false;
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ scanner.close();
+ }
+
+ /**
+ * Used to read an existing state from a window data manager and initialize the starting state of this operator. Note
+ * that there might be a query that was partially processed before the checkpointing process or the crash was
+ * triggered.
+ */
+ private void initCurrentState()
+ {
+ Map<KuduPartitionScanAssignmentMeta,Long> savedState = null;
+ if ( (windowManagerDataForScans.size() == 0) && (currentQueryBeingProcessed == null) ) {
+ // This is the case of an application restart possibly and hence want to get the exact state
+ try {
+ savedState = (Map<KuduPartitionScanAssignmentMeta,Long>)windowDataManager.retrieve(
+ windowDataManager.getLargestCompletedWindow());
+ } catch (IOException e) {
+ throw new RuntimeException("Error while retrieving the window manager data at the initialization phase", e);
+ } catch ( NullPointerException ex) {
+ LOG.error("Error while getting the window manager data ", ex);
+ }
+ }
+ if ( ( savedState != null) && (savedState.size() > 0 ) ) {
+ KuduPartitionScanAssignmentMeta aMeta = savedState.keySet().iterator().next(); // we have one atleast
+ currentQueryBeingProcessed = aMeta.getCurrentQuery();
+ allScansCompleteForThisQuery = false;
+ windowManagerDataForScans.putAll(savedState);
+ processForQueryString(currentQueryBeingProcessed);
+ }
+ }
+
+ @Override
+ public Collection<Partition<AbstractKuduInputOperator>> definePartitions(Collection collection,
+ PartitioningContext context)
+ {
+ initPartitioner();
+ return partitioner.definePartitions(collection,context);
+ }
+
+ @Override
+ public void partitioned(Map partitions)
+ {
+ initPartitioner();
+ partitioner.partitioned(partitions);
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ Response response = new Response();
+ if ( ( partitionPieAssignment == null) || (partitionPieAssignment.size() == 0) ) {
+ response.repartitionRequired = true;
+ } else {
+ response.repartitionRequired = false;
+ }
+ return response;
+ }
+
+ private void initBuffer()
+ {
+ buffer = new DisruptorBlockingQueue<KuduRecordWithMeta<T>>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+ }
+
+ private void initPartitioner()
+ {
+ if (partitioner != null) {
+ return;
+ }
+ checkNotNull(apexKuduConnectionInfo,"Apex Kudu connection cannot be null while setting partitioner");
+ switch (partitionScanStrategy) {
+ case MANY_TABLETS_PER_OPERATOR:
+ partitioner = new KuduOneToManyPartitioner(this);
+ break;
+ case ONE_TABLET_PER_OPERATOR:
+ default:
+ partitioner = new KuduOneToOnePartitioner(this);
+ break;
+
+ }
+ }
+
+ private void initScanner()
+ {
+ switch (scanOrderStrategy) {
+ case CONSISTENT_ORDER_SCANNER:
+ setFaultTolerantScanner(true); // Consistent order scanners require them to be fault tolerant
+ scanner = new KuduPartitionConsistentOrderScanner<>(this);
+ break;
+ case RANDOM_ORDER_SCANNER:
+ default:
+ scanner = new KuduPartitionRandomOrderScanner(this);
+ break;
+ }
+ }
+
+ /***
+ *
+ * @param parsedQuery The parsed query string
+ * @return Null if the SQL expression cannot be mapped properly to the POJO fields in which case the SQL string is
+ * sent to the Error port
+ */
+ public Map<String,Object> extractSettersForResultObject(SQLToKuduPredicatesTranslator parsedQuery)
+ {
+ Map<String,String> aliasesUsedForThisQuery = parsedQuery.getKuduSQLParseTreeListener().getAliases();
+ Map<String,Object> setterMap = new HashMap<>();
+ Field[] fieldsOfThisPojo = clazzForResultObject.getDeclaredFields();
+ Set<String> allPojoFieldNamesUsed = new HashSet<>(aliasesUsedForThisQuery.values());
+ for ( Field aField : fieldsOfThisPojo) {
+ if (!allPojoFieldNamesUsed.contains(aField.getName())) {
+ LOG.error("Invalid mapping fo Kudu table column name to the POJO field name " + aField.getName());
+ return null; // SQL expression will be sent to the error port
+ }
+ }
+ for (ColumnSchema aKuduTableColumn: kuduColNameToSchemaMapping.values()) {
+ String kuduColumnName = aKuduTableColumn.getName();
+ switch ( aKuduTableColumn.getType().getDataType().getNumber()) {
+ case Common.DataType.BINARY_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetter(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName),ByteBuffer.class));
+ break;
+ case Common.DataType.BOOL_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterBoolean(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.DOUBLE_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterDouble(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.FLOAT_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterFloat(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.INT8_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterByte(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.INT16_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterShort(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.INT32_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterInt(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.UNIXTIME_MICROS_VALUE:
+ case Common.DataType.INT64_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetterLong(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName)));
+ break;
+ case Common.DataType.STRING_VALUE:
+ setterMap.put(kuduColumnName,PojoUtils.createSetter(clazzForResultObject,
+ aliasesUsedForThisQuery.get(kuduColumnName),String.class));
+ break;
+ case Common.DataType.UINT8_VALUE:
+ LOG.error("Unsigned int 8 not supported yet");
+ throw new RuntimeException("uint8 not supported in Kudu schema yet");
+ case Common.DataType.UINT16_VALUE:
+ LOG.error("Unsigned int 16 not supported yet");
+ throw new RuntimeException("uint16 not supported in Kudu schema yet");
+ case Common.DataType.UINT32_VALUE:
+ LOG.error("Unsigned int 32 not supported yet");
+ throw new RuntimeException("uint32 not supported in Kudu schema yet");
+ case Common.DataType.UINT64_VALUE:
+ LOG.error("Unsigned int 64 not supported yet");
+ throw new RuntimeException("uint64 not supported in Kudu schema yet");
+ case Common.DataType.UNKNOWN_DATA_VALUE:
+ LOG.error("unknown data type ( complex types ? ) not supported yet");
+ throw new RuntimeException("Unknown data type ( complex types ? ) not supported in Kudu schema yet");
+ default:
+ LOG.error("unknown type/default ( complex types ? ) not supported yet");
+ throw new RuntimeException("Unknown type/default ( complex types ? ) not supported in Kudu schema yet");
+ }
+ }
+ return setterMap;
+ }
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+
+ }
+
+ public ApexKuduConnection.ApexKuduConnectionBuilder getApexKuduConnectionInfo()
+ {
+ return apexKuduConnectionInfo;
+ }
+
+ public void setApexKuduConnectionInfo(ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnection)
+ {
+ apexKuduConnectionInfo = apexKuduConnection;
+ }
+
+ public List<KuduPartitionScanAssignmentMeta> getPartitionPieAssignment()
+ {
+ return partitionPieAssignment;
+ }
+
+ public void setPartitionPieAssignment(List<KuduPartitionScanAssignmentMeta> partitionPieAssignment)
+ {
+ this.partitionPieAssignment = partitionPieAssignment;
+ }
+
+ public KuduPartitionScanStrategy getPartitionScanStrategy()
+ {
+ return partitionScanStrategy;
+ }
+
+ public void setPartitionScanStrategy(KuduPartitionScanStrategy partitionScanStrategy)
+ {
+ this.partitionScanStrategy = partitionScanStrategy;
+ }
+
+ public String getCurrentQueryBeingProcessed()
+ {
+ return currentQueryBeingProcessed;
+ }
+
+ public void setCurrentQueryBeingProcessed(String currentQueryBeingProcessed)
+ {
+ this.currentQueryBeingProcessed = currentQueryBeingProcessed;
+ }
+
+ public int getNumberOfPartitions()
+ {
+ return numberOfPartitions;
+ }
+
+ public void setNumberOfPartitions(int numberOfPartitions)
+ {
+ this.numberOfPartitions = numberOfPartitions;
+ }
+
+ public KuduScanOrderStrategy getScanOrderStrategy()
+ {
+ return scanOrderStrategy;
+ }
+
+ public void setScanOrderStrategy(KuduScanOrderStrategy scanOrderStrategy)
+ {
+ this.scanOrderStrategy = scanOrderStrategy;
+ }
+
+ public AbstractKuduInputPartitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
+ public void setPartitioner(AbstractKuduInputPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ public AbstractKuduPartitionScanner<T,C> getScanner()
+ {
+ return scanner;
+ }
+
+ public void setScanner(AbstractKuduPartitionScanner<T,C> scanner)
+ {
+ this.scanner = scanner;
+ }
+
+ public Class<T> getClazzForResultObject()
+ {
+ return clazzForResultObject;
+ }
+
+ public void setClazzForResultObject(Class<T> clazzForResultObject)
+ {
+ this.clazzForResultObject = clazzForResultObject;
+ }
+
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+ {
+ return cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+ {
+ this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+ }
+
+ public int getBufferCapacity()
+ {
+ return bufferCapacity;
+ }
+
+ public void setBufferCapacity(int bufferCapacity)
+ {
+ this.bufferCapacity = bufferCapacity;
+ }
+
+ public Map<String, String> getKuduColNameToPOJOFieldNameMap()
+ {
+ return kuduColNameToPOJOFieldNameMap;
+ }
+
+ public void setKuduColNameToPOJOFieldNameMap(Map<String, String> kuduColNameToPOJOFieldNameMap)
+ {
+ this.kuduColNameToPOJOFieldNameMap = kuduColNameToPOJOFieldNameMap;
+ }
+
+ public Map<String, ColumnSchema> getKuduColNameToSchemaMapping()
+ {
+ return kuduColNameToSchemaMapping;
+ }
+
+ public void setKuduColNameToSchemaMapping(Map<String, ColumnSchema> kuduColNameToSchemaMapping)
+ {
+ this.kuduColNameToSchemaMapping = kuduColNameToSchemaMapping;
+ }
+
+ public int getMaxTuplesPerWindow()
+ {
+ return maxTuplesPerWindow;
+ }
+
+ public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
+ {
+ this.maxTuplesPerWindow = maxTuplesPerWindow;
+ }
+
+ /**
+ * Note that it is recommended that the buffer capacity be in the powers of two
+ * @param buffer
+ */
+ public void setBuffer(DisruptorBlockingQueue<KuduRecordWithMeta<T>> buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public DisruptorBlockingQueue<KuduRecordWithMeta<T>> getBuffer()
+ {
+ return buffer;
+ }
+
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ public void setWindowDataManager(WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = windowDataManager;
+ }
+
+ public boolean isFaultTolerantScanner()
+ {
+ return isFaultTolerantScanner;
+ }
+
+ public void setFaultTolerantScanner(boolean faultTolerantScanner)
+ {
+ isFaultTolerantScanner = faultTolerantScanner;
+ }
+
+ public Map<String, String> getOptionsEnabledForCurrentQuery()
+ {
+ return optionsEnabledForCurrentQuery;
+ }
+
+ public void setOptionsEnabledForCurrentQuery(Map<String, String> optionsEnabledForCurrentQuery)
+ {
+ this.optionsEnabledForCurrentQuery = optionsEnabledForCurrentQuery;
+ }
+
+ public boolean isPartitioned()
+ {
+ return isPartitioned;
+ }
+
+ public void setPartitioned(boolean partitioned)
+ {
+ isPartitioned = partitioned;
+ }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
similarity index 97%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
index ff668b0..b171d52 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/AbstractKuduOutputOperator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -31,6 +31,7 @@ import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
@@ -48,6 +49,7 @@ import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Statistics;
import org.apache.kudu.client.Update;
import org.apache.kudu.client.Upsert;
+
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
@@ -338,8 +340,7 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
try {
kuduSession.apply(currentOperation);
} catch (KuduException e) {
- LOG.error("Could not execute operation because " + e.getMessage(), e);
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException("Could not execute operation because " + e.getMessage(), e);
}
}
@@ -443,8 +444,7 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
try {
windowDataManager.committed(windowId);
} catch (IOException e) {
- LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage());
- throw new RuntimeException(e);
+ throw new RuntimeException("Error while committing the window id " + windowId + " because " + e.getMessage(), e );
}
}
@@ -496,15 +496,14 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator
try {
kuduSession.flush();
} catch (KuduException e) {
- LOG.error("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
- throw new RuntimeException(e);
+ throw new RuntimeException("Could not flush kudu session on an end window boundary " + e.getMessage(), e);
}
if ( currentWindowId > windowDataManager.getLargestCompletedWindow()) {
try {
windowDataManager.save(currentWindowId,currentWindowId);
} catch (IOException e) {
- LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage());
- throw new RuntimeException(e);
+ throw new RuntimeException("Error while persisting the current window state " + currentWindowId +
+ " because " + e.getMessage(), e);
}
}
numOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) -
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
similarity index 90%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
index 99eaf1b..4dddf82 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/ApexKuduConnection.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
import java.io.Serializable;
import java.util.ArrayList;
@@ -34,7 +34,6 @@ import org.apache.kudu.client.SessionConfiguration;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* <p>Represents a connection to the Kudu cluster. An instance of this class is to be supplied (via a builder pattern to)
* {@link AbstractKuduOutputOperator} to connect to a Kudu cluster.</p>
@@ -52,12 +51,14 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class);
+ private ApexKuduConnectionBuilder builderForThisConnection;
private ApexKuduConnection(ApexKuduConnectionBuilder builder)
{
checkNotNull(builder,"Builder cannot be null to establish kudu session");
checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified");
checkNotNull(builder.tableName,"Kudu table cannot be null");
+ builderForThisConnection = builder;
KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection);
if (builder.isOperationTimeOutSet) {
kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs);
@@ -86,8 +87,7 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
kuduTable = kuduClient.openTable(builder.tableName);
}
} catch (Exception e) {
- LOG.error("Kudu table existence could not be ascertained " + e.getMessage());
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException("Kudu table existence could not be ascertained " + e.getMessage(), e);
}
}
@@ -128,8 +128,20 @@ public class ApexKuduConnection implements AutoCloseable, Serializable
this.kuduClient = kuduClient;
}
- public static class ApexKuduConnectionBuilder
+ public ApexKuduConnectionBuilder getBuilderForThisConnection()
+ {
+ return builderForThisConnection;
+ }
+
+ public void setBuilderForThisConnection(ApexKuduConnectionBuilder builderForThisConnection)
{
+ this.builderForThisConnection = builderForThisConnection;
+ }
+
+ public static class ApexKuduConnectionBuilder implements Serializable
+ {
+ private static final long serialVersionUID = -3428649955056723311L;
+
List<String> mastersCollection = new ArrayList<>();
String tableName;
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
similarity index 96%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
index 6c7e9a6..794d7e7 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/BaseKuduOutputOperator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
import java.io.IOException;
import java.io.InputStream;
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Properties;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.SessionConfiguration;
@@ -53,7 +52,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
{
- public static final String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
+ public static String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties";
public static final String TABLE_NAME = "tablename";
@@ -84,6 +83,7 @@ public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
if (kuduPropsFileAsStream != null) {
kuduConnectionProperties.load(kuduPropsFileAsStream);
+ kuduPropsFileAsStream.close();
} else {
throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
" is not locatable in the root classpath");
@@ -151,7 +151,7 @@ public class BaseKuduOutputOperator extends AbstractKuduOutputOperator
* The default is to implement for ATLEAST_ONCE semantics. Override this control this behavior.
* @param executionContext The tuple which represents the execution context along with the payload
* @param reconcilingWindowId The window Id of the reconciling window
- * @return
+ * @return True if the current row is to be written to the Kudu Store, false to be skipped
*/
@Override
protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext,
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java
new file mode 100644
index 0000000..a9de1d4
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.ExternalConsistencyMode;
+import org.apache.kudu.client.SessionConfiguration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>A Kudu input operator that provides the functionality of scanning a Kudu table by incrementing
+ * a column in steps as specified in the configuration. Each iteration scans the table as given by the
+ * SQL expression in the properties file. Other aspects like connection details are also read from the properties file.
+ * </p>
+ *
+ * <p>
+ * Here is an example property file format.
+ * Place a file named kuduincrementalstepscaninputoperator.properties anywhere in the classpath ( say by
+ * bundling in the resources section ). Alternately one can use the alternative constructor to specify the filename.
+ * The properties can be set as follows:
+ * </p>
+ *
+ * <p>
+ * masterhosts=192.168.1.41:7051,192.168.1.54:7051
+ * tablename=kudutablename
+ *templatequerystring=SELECT * KUDUTABLE WHERE COL1 > 1234 AND TIMESTAMPCOL >= :lowerbound AND TIMESTAMPCOL< :upperbound
+ * templatequerylowerboundparamname = :lowerbound
+ * templatequeryupperboundparamname = :upperbound
+ * templatequeryseedvalue = 1503001033219
+ * templatequeryincrementalvalue = 120000
+ *
+ * The above property file will scan a table called KUDUTABLE and only scan those rows for which a column named COL1
+ * is greater than 1234. It also uses a column named TIMESTAMPCOL to start with the starting value of the lower
+ * bound as 1503001033219 and increments the scan window by 2 minutes. Note that :lowerbound and :upperbound are
+ * just template names that will be used for string substitution as the time windows progress.
+ *
+ * Note that the implementation assumes the templated column data type is of type long. Considering other types
+ * is a trivial extenstion and perhaps best achieved by extending the Abstract implementation.
+ * </p>
+ */
+public class IncrementalStepScanInputOperator<T,C extends InputOperatorControlTuple>
+ extends AbstractKuduInputOperator<T,C>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalStepScanInputOperator.class);
+
+ // The incremental value of the template column as read from the properties file
+ private long stepUpValue;
+
+ // The initial value of the templated column as read from the properties file
+ private long startSeedValue;
+
+ private long currentLowerBound;
+
+ private long currentUpperBound;
+
+ private String queryTemplateString;
+
+ // The name of the parameter in the templated string that represents the lower bound
+ private String lowerBoundParamName;
+
+ // The name of the parameter in the templated string that represents the upperbound
+ private String upperBoundParamName;
+
+ public static String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduincrementalstepscaninputoperator.properties";
+
+ public static final String TABLE_NAME = "tablename";
+
+ public static final String MASTER_HOSTS = "masterhosts";
+
+ public static final String POJO_CLASS_NAME = "pojoclassname";
+
+ // The name of the property in properties file that gives the query expression as a template
+ public static final String TEMPLATE_QUERY_STRING = "templatequerystring";
+
+ // The name of the property that specifies the lower bound name that will be used for incrementing
+ public static final String TEMPLATE_QUERY_LOWERBOUND_PARAMNAME = "templatequerylowerboundparamname";
+
+ // The name of the property that specifies the lower bound name that will be used for incrementing
+ public static final String TEMPLATE_QUERY_UPPERBOUND_PARAMNAME = "templatequeryupperboundparamname";
+
+ // The name of the property that specifies the value of the seed/starting value for the templated column
+ public static final String TEMPLATE_QUERY_SEED_VALUE = "templatequeryseedvalue";
+
+ // The name of the property that specifies the value of the increment for the templated column
+ public static final String TEMPLATE_QUERY_INCREMENT_STEP_VALUE = "templatequeryincrementalvalue";
+
+ private ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder;
+
+ /***
+ * To be used only by the Kryo deserialization logic and not encouraged for generic use.
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public IncrementalStepScanInputOperator() throws IOException, NumberFormatException
+ {
+ initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
+ }
+
+ public IncrementalStepScanInputOperator(Class<T> classForPojo, String configFileInClasspath)
+ throws IOException, NumberFormatException
+ {
+ initConnectionBuilderProperties(configFileInClasspath);
+ clazzForResultObject = classForPojo;
+ }
+
+ /***
+ * Instantiates all of the common configurations. Also does the initiation logic for the mandatory fields in super
+ * class
+ * @param configFileInClasspath The properties file name
+ * @throws IOException If not able to read properties file
+ * @throws NumberFormatException If not able to format string as long
+ */
+ private void initConnectionBuilderProperties(String configFileInClasspath)
+ throws IOException, NumberFormatException
+ {
+ Properties incrementalStepProperties = new Properties();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath);
+ if (kuduPropsFileAsStream != null) {
+ incrementalStepProperties.load(kuduPropsFileAsStream);
+ } else {
+ if (!DEFAULT_CONNECTION_PROPS_FILE_NAME.equalsIgnoreCase(configFileInClasspath)) {
+ throw new IOException("Properties file required for Kudu connection " + configFileInClasspath +
+ " is not locatable in the root classpath");
+ } else {
+ LOG.warn("Properties file could not be loaded. Expecting the user to set properties manually");
+ }
+ }
+ tableName = checkNotNull(incrementalStepProperties.getProperty(TABLE_NAME));
+ String masterHostsConnectionString = checkNotNull(incrementalStepProperties.getProperty(MASTER_HOSTS));
+ String[] masterAndHosts = masterHostsConnectionString.split(",");
+ lowerBoundParamName = checkNotNull(incrementalStepProperties.getProperty(
+ TEMPLATE_QUERY_LOWERBOUND_PARAMNAME));
+ upperBoundParamName = checkNotNull(incrementalStepProperties.getProperty(
+ TEMPLATE_QUERY_UPPERBOUND_PARAMNAME));
+ String stepUpValueString = checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_INCREMENT_STEP_VALUE));
+ String seedValueString = checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_SEED_VALUE));
+ queryTemplateString = checkNotNull(incrementalStepProperties.getProperty(TEMPLATE_QUERY_STRING));
+ startSeedValue = Long.valueOf(seedValueString);
+ stepUpValue = Long.valueOf(stepUpValueString);
+ currentLowerBound = startSeedValue;
+ currentUpperBound = currentLowerBound + startSeedValue;
+ initKuduConfig(tableName, Arrays.asList(masterAndHosts));
+ }
+
+ /***
+ * A simple init of the kudu connection config. Override this if you would like to fine tune hte connection parameters
+ * @param kuduTableName The Kudu table name
+ * @param kuduMasters The master hosts of the Kudu cluster.
+ */
+ public void initKuduConfig(String kuduTableName, List<String> kuduMasters)
+ {
+ apexKuduConnectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder()
+ .withTableName(kuduTableName)
+ .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
+ .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
+ .withNumberOfBossThreads(1)
+ .withNumberOfWorkerThreads(2)
+ .withSocketReadTimeOutAs(3000)
+ .withOperationTimeOutAs(3000);
+ for ( String aMasterAndHost: kuduMasters ) {
+ apexKuduConnectionBuilder = apexKuduConnectionBuilder.withAPossibleMasterHostAs(aMasterAndHost);
+ }
+ apexKuduConnectionInfo = apexKuduConnectionBuilder;
+ }
+
+ /***
+ * A simple template replacement logic which provides the next window of the query by replacing the
+ * lower and upper bound parameters accordingly.
+ * @return Returns the next in line query as a String that is compliant with the Kudu SQL grammar.
+ */
+ @Override
+ protected String getNextQuery()
+ {
+ long lowerBoundToUseForNextIteration = currentLowerBound;
+ long upperBoundToUseForNextIteration = currentUpperBound;
+ String queryToUse = new String(queryTemplateString);
+ queryToUse = queryToUse.replace(lowerBoundParamName,("" + lowerBoundToUseForNextIteration));
+ queryToUse = queryToUse.replace(upperBoundParamName,("" + upperBoundToUseForNextIteration));
+ currentLowerBound = currentUpperBound;
+ currentUpperBound = currentLowerBound + stepUpValue;
+ return queryToUse;
+ }
+
+ public long getStepUpValue()
+ {
+ return stepUpValue;
+ }
+
+ public void setStepUpValue(long stepUpValue)
+ {
+ this.stepUpValue = stepUpValue;
+ }
+
+ public long getStartSeedValue()
+ {
+ return startSeedValue;
+ }
+
+ public void setStartSeedValue(long startSeedValue)
+ {
+ this.startSeedValue = startSeedValue;
+ currentLowerBound = startSeedValue;
+ currentUpperBound = currentLowerBound + startSeedValue;
+ }
+
+ public String getQueryTemplateString()
+ {
+ return queryTemplateString;
+ }
+
+ public void setQueryTemplateString(String queryTemplateString)
+ {
+ this.queryTemplateString = queryTemplateString;
+ }
+
+ public String getLowerBoundParamName()
+ {
+ return lowerBoundParamName;
+ }
+
+ public void setLowerBoundParamName(String lowerBoundParamName)
+ {
+ this.lowerBoundParamName = lowerBoundParamName;
+ }
+
+ public String getUpperBoundParamName()
+ {
+ return upperBoundParamName;
+ }
+
+ public void setUpperBoundParamName(String upperBoundParamName)
+ {
+ this.upperBoundParamName = upperBoundParamName;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java
new file mode 100644
index 0000000..b591acf
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/InputOperatorControlTuple.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import org.apache.apex.api.operator.ControlTuple;
+
+/**
+ * A simple control tuple class that is used to represent a begin or end of a given SQL expression.
+ */
+public class InputOperatorControlTuple implements ControlTuple
+{
+ private String query;
+
+ private boolean beginNewQuery;
+
+ private boolean endCurrentQuery;
+
+ private String controlMessage;
+
+ @Override
+ public DeliveryType getDeliveryType()
+ {
+ return DeliveryType.IMMEDIATE;
+ }
+
+ public String getQuery()
+ {
+ return query;
+ }
+
+ public void setQuery(String query)
+ {
+ this.query = query;
+ }
+
+ public boolean isBeginNewQuery()
+ {
+ return beginNewQuery;
+ }
+
+ public void setBeginNewQuery(boolean beginNewQuery)
+ {
+ this.beginNewQuery = beginNewQuery;
+ }
+
+ public boolean isEndCurrentQuery()
+ {
+ return endCurrentQuery;
+ }
+
+ public void setEndCurrentQuery(boolean endCurrentQuery)
+ {
+ this.endCurrentQuery = endCurrentQuery;
+ }
+
+ public String getControlMessage()
+ {
+ return controlMessage;
+ }
+
+ public void setControlMessage(String controlMessage)
+ {
+ this.controlMessage = controlMessage;
+ }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
similarity index 98%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
rename to kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
index a346705..ca6b2ff 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduExecutionContext.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
similarity index 95%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
index 64b46c6..eda0ac5 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/KuduMutationType.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
/**
* <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
similarity index 54%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
index 64b46c6..5f54ba3 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/package-info.java
@@ -16,17 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * Kudu package contains the Operator implementations for both input and output. There is a base implementations for the
+ * Output Operator. {@link org.apache.apex.malhar.kudu.BaseKuduOutputOperator}. For custom configurability use
+ * {@link org.apache.apex.malhar.kudu.AbstractKuduOutputOperator}. Similarly there is a default implementation for the
+ * Kudu Input operator. {@link org.apache.apex.malhar.kudu.IncrementalStepScanInputOperator}. Extend the
+ * {@link org.apache.apex.malhar.kudu.AbstractKuduInputOperator} for configurability.
*/
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu;
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java
new file mode 100644
index 0000000..de3859b
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitioner.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduTable;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * An abstract class that contains logic that is common across all partitioners available for the Kudu input operator.
+ */
+public abstract class AbstractKuduInputPartitioner implements Partitioner<AbstractKuduInputOperator>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduInputPartitioner.class);
+
+ @JsonIgnore
+ protected AbstractKuduInputOperator prototypeKuduInputOperator;
+
+ public AbstractKuduInputPartitioner(AbstractKuduInputOperator prototypeOperator)
+ {
+ prototypeKuduInputOperator = prototypeOperator;
+ }
+
+ /**
+ * Builds a set of scan tokens. The list of scan tokens are generated as if the entire table is being scanned
+ * i.e. a SELECT * FROM TABLE equivalent expression. This list is used to assign the partition pie assignments
+ * for all of the planned partition of operators. Each operator gets a part of the PIE as if all columns were
+ * selected. Subsequently when a query is to be processed, the query is used to generate the scan tokens applicable
+ * for that query. Given that partition pie represents the entire data set, the scan assignments for the current
+ * query will be a subset.
+ * @return The list of scan tokens as if the entire table is getting scanned.
+ * @throws Exception in cases when the connection to kudu cluster cannot be closed.
+ */
+ public List<KuduScanToken> getKuduScanTokensForSelectAllColumns() throws Exception
+ {
+ // We are not using the current query for deciding the partition strategy but a SELECT * as
+ // we do not want to want to optimize on just the current query. This prevents rapid throttling of operator
+ // instances when the scan patterns are erratic. On the other hand, this might result on under utilized
+ // operator resources in the DAG but will be consistent at a minimum.
+ ApexKuduConnection apexKuduConnection = prototypeKuduInputOperator.getApexKuduConnectionInfo().build();
+ KuduClient clientHandle = apexKuduConnection.getKuduClient();
+ KuduTable table = apexKuduConnection.getKuduTable();
+ KuduScanToken.KuduScanTokenBuilder builder = clientHandle.newScanTokenBuilder(table);
+ List<String> allColumns = new ArrayList<>();
+ List<ColumnSchema> columnList = apexKuduConnection.getKuduTable().getSchema().getColumns();
+ for ( ColumnSchema column : columnList) {
+ allColumns.add(column.getName());
+ }
+ builder.setProjectedColumnNames(allColumns);
+ LOG.debug("Building the partition pie assignments for the input operator");
+ List<KuduScanToken> allPossibleTokens = builder.build();
+ apexKuduConnection.close();
+ return allPossibleTokens;
+ }
+
+ /***
+ * Returns the number of partitions. The configuration is used as the source of truth for
+ * number of partitions. The config is then overridden if the code in the client driver code explicitly sets
+ * the number of partitions.
+ * @param context The context as provided by the launcher
+ * @return The number of partitions that are planned for this input operator. At a minimum it is 1.
+ */
+ public int getNumberOfPartitions(PartitioningContext context)
+ {
+ int proposedPartitionCount = context.getParallelPartitionCount();
+ if ( prototypeKuduInputOperator.getNumberOfPartitions() != -1 ) { // -1 is the default
+ // There is a manual override of partitions from code. Use this
+ proposedPartitionCount = prototypeKuduInputOperator.getNumberOfPartitions();
+ LOG.info(" Set the partition count based on the code as opposed to configuration ");
+ }
+ if ( proposedPartitionCount <= 0) {
+ // Parallel partitions not enabled. But the design is to use one to many mapping. Hence defaulting to one
+ LOG.info(" Defaulting to one partition as parallel partitioning is not enabled");
+ proposedPartitionCount = 1;
+ }
+ LOG.info(" Planning to use " + proposedPartitionCount + " partitions");
+ return proposedPartitionCount;
+ }
+
+ /***
+ * Builds a list of scan assignment metadata instances from raw kudu scan tokens as returned by the Kudu Query planner
+ * assuming all of the columns and rows are to be scanned
+ * @param partitions The current set of partitions
+ * @param context The current partitioning context
+ * @return The new set of partitions
+ * @throws Exception if the Kudu connection opened for generating the scan plan cannot be closed
+ */
+ public List<KuduPartitionScanAssignmentMeta> getListOfPartitionAssignments(
+ Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context) throws Exception
+ {
+ List<KuduPartitionScanAssignmentMeta> returnList = new ArrayList<>();
+ List<KuduScanToken> allColumnsScanTokens = new ArrayList<>();
+ // we are looking at a first time invocation scenario
+ try {
+ allColumnsScanTokens.addAll(getKuduScanTokensForSelectAllColumns());
+ } catch (Exception e) {
+ LOG.error(" Error while calculating the number of scan tokens for all column projections " + e.getMessage(),e);
+ }
+ if ( allColumnsScanTokens.size() == 0 ) {
+ LOG.error("No column information could be extracted from the Kudu table");
+ throw new Exception("No column information could be extracted from the Kudu table");
+ }
+ int totalPartitionCount = allColumnsScanTokens.size();
+ LOG.info("Determined maximum as " + totalPartitionCount + " tablets for this table");
+ for (int i = 0; i < totalPartitionCount; i++) {
+ KuduPartitionScanAssignmentMeta aMeta = new KuduPartitionScanAssignmentMeta();
+ aMeta.setOrdinal(i);
+ aMeta.setTotalSize(totalPartitionCount);
+ returnList.add(aMeta);
+ LOG.info("A planned scan meta of the total partitions " + aMeta);
+ }
+ LOG.info("Total kudu partition size is " + returnList.size());
+ return returnList;
+ }
+
+ /***
+ * Clones the original operator and sets the partition pie assignments for this operator. Kryo is used for cloning
+ * @param scanAssignmentsForThisPartition The partition pie that is assigned to the operator according to the
+ * configured partitioner
+ * @return The partition that is used by the runtime to launch the new physical operator instance
+ */
+ public Partitioner.Partition<AbstractKuduInputOperator> clonePartitionAndAssignScanMeta(
+ List<KuduPartitionScanAssignmentMeta> scanAssignmentsForThisPartition)
+ {
+ Partitioner.Partition<AbstractKuduInputOperator> clonedKuduInputOperator =
+ new DefaultPartition<>(KryoCloneUtils.cloneObject(prototypeKuduInputOperator));
+ clonedKuduInputOperator.getPartitionedInstance().setPartitionPieAssignment(scanAssignmentsForThisPartition);
+ return clonedKuduInputOperator;
+ }
+
+ /***
+ * Implements the main logic for defining partitions. The logic to create a partition is pretty straightforward. A
+ * SELECT * expression is used to generate a plan ( a list of kudu scan tokens). The list is then evenly
+ * distributed to each of the physical operator instances as defined by the configuration. At runtime when a new query
+ * needs to be processed, a plan specific to the query is generated and the planned list of kudu scan tokens
+ * for that query are distributed among the available operator instances. Note that we define the partitions only once
+ * during the first invocation. This is because t is ideal we do not optimize on the current query that is running
+ * before we can dynamically partition and cause too much throttling based on the query patterns. Please see
+ * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+ * a query based planning is done.
+ * @param partitions The current set of partitions
+ * @param context The partitioning context as provided by the runtime platform.
+ * @return The collection of planned partitions as implemented by the partitioner that is configured by the user
+ */
+ @Override
+ public Collection<Partition<AbstractKuduInputOperator>> definePartitions(
+ Collection<Partition<AbstractKuduInputOperator>> partitions, PartitioningContext context)
+ {
+ if ( partitions != null) {
+ LOG.info("The current partitioner plan has " + partitions.size() + " operators before redefining");
+ }
+ List<Partition<AbstractKuduInputOperator>> partitionsForInputOperator = new ArrayList<>();
+ List<KuduPartitionScanAssignmentMeta> assignmentMetaList = new ArrayList<>();
+ try {
+ assignmentMetaList.addAll(getListOfPartitionAssignments(partitions,context));
+ } catch (Exception e) {
+ throw new RuntimeException("Aborting partition planning as Kudu meta data could not be obtained", e);
+ }
+ LOG.info("Maximum possible Kudu input operator partition count is " + assignmentMetaList.size());
+ Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignments = assign(assignmentMetaList,context);
+ boolean requiresRepartitioning = false;
+ if ( partitions == null) {
+ requiresRepartitioning = true;
+ } else {
+ for ( Partition<AbstractKuduInputOperator> aPartition : partitions) {
+ if ( !aPartition.getPartitionedInstance().isPartitioned()) {
+ requiresRepartitioning = true;
+ break;
+ }
+ }
+ }
+ if ( requiresRepartitioning ) {
+ partitions.clear();
+ LOG.info("Clearing all of the current partitions and setting up new ones");
+ partitions.clear();
+ for ( int i = 0; i < assignments.size(); i++) {
+ List<KuduPartitionScanAssignmentMeta> assignmentForThisOperator = assignments.get(i);
+ partitionsForInputOperator.add(clonePartitionAndAssignScanMeta(assignmentForThisOperator));
+ LOG.info("Assigned apex operator " + i + " with " + assignmentForThisOperator.size() + " kudu mappings");
+ }
+ LOG.info("Returning " + partitionsForInputOperator.size() + " partitions for the input operator");
+ return partitionsForInputOperator;
+ } else {
+ LOG.info("Not making any changes to the partitions");
+ return partitions;
+ }
+ }
+
+ @Override
+ public void partitioned(Map<Integer, Partition<AbstractKuduInputOperator>> partitions)
+ {
+
+ }
+
+ /***
+ * Abstract method that will be implemented by the individual partition planners basing on the functionality they
+ * provide. Please see {@link KuduOneToManyPartitioner} and {@link KuduOneToOnePartitioner} for specific
+ * implementations
+ * @param totalList The ltotal list of possible tablet scans for all queries
+ * @param context The context that is provided by the framework when repartitioning is to be executed
+ * @return A map that gives the operator number as key and the list of {@link KuduPartitionScanAssignmentMeta} that
+ * are assigned for the operator with that number.
+ */
+ public abstract Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(
+ List<KuduPartitionScanAssignmentMeta> totalList, PartitioningContext context);
+
+ public AbstractKuduInputOperator getPrototypeKuduInputOperator()
+ {
+ return prototypeKuduInputOperator;
+ }
+
+ public void setPrototypeKuduInputOperator(AbstractKuduInputOperator prototypeKuduInputOperator)
+ {
+ this.prototypeKuduInputOperator = prototypeKuduInputOperator;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java
new file mode 100644
index 0000000..1313778
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitioner.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+
+/**
+ * Used when a user would like to assign multiple kudu tablets to a single physical instance of the Kudu input operator
+ */
+public class KuduOneToManyPartitioner extends AbstractKuduInputPartitioner
+{
+ private static final Logger LOG = LoggerFactory.getLogger(KuduOneToManyPartitioner.class);
+
+ public KuduOneToManyPartitioner(AbstractKuduInputOperator prototypeOperator)
+ {
+ super(prototypeOperator);
+ }
+
+ /**
+ * Distributes the tablets evenly among the physical instances of the kudu input operator. Please see
+ * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+ * a query based planning is done.
+ * @param totalList The total list of possible tablet scans for all queries
+ * @param context The context that is provided by the framework when repartitioning is to be executed
+ * @return Map with key as operator id and value as the a list of assignments that can be assigned to that operator.
+ */
+ @Override
+ public Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(List<KuduPartitionScanAssignmentMeta> totalList,
+ PartitioningContext context)
+ {
+ Map<Integer,List<KuduPartitionScanAssignmentMeta>> partitionAssignments = new HashMap<>();
+ int partitionCount = getNumberOfPartitions(context);
+ if ( partitionCount <= 0) {
+ LOG.error(" Partition count cannot be zero ");
+ partitionCount = 1; // set to a minimum of one.
+ }
+ int idealDistributionRatio = (totalList.size() / partitionCount) + 1;
+ LOG.info(" Distributing not more than " + idealDistributionRatio + " partitions per input operator");
+ int counterForLoopingTotal = 0;
+ int totalSizeOfKuduScanAssignments = totalList.size();
+ for ( int i = 0; i < partitionCount; i++) {
+ partitionAssignments.put(i, new ArrayList<KuduPartitionScanAssignmentMeta>());
+ }
+ // We round robin all of the scan assignments so that all of the apex partitioned operators a part of the effort
+ while ( counterForLoopingTotal < totalSizeOfKuduScanAssignments) {
+ for (int i = 0; i < partitionCount; i++) {
+ List<KuduPartitionScanAssignmentMeta> assignmentsForThisOperatorId = partitionAssignments.get(i);
+ if (counterForLoopingTotal < totalSizeOfKuduScanAssignments) { // take care of non-optimal distribution ratios
+ assignmentsForThisOperatorId.add(totalList.get(counterForLoopingTotal));
+ counterForLoopingTotal += 1;
+ } else {
+ break;
+ }
+ }
+ }
+ return partitionAssignments;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java
new file mode 100644
index 0000000..e477e83
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitioner.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+/**
+ * A partitioner that assigns one to one mapping of each kudu tablet to one physical instance of Kudu input operator.
+ */
+public class KuduOneToOnePartitioner extends AbstractKuduInputPartitioner
+{
+ private static final Logger LOG = LoggerFactory.getLogger(KuduOneToOnePartitioner.class);
+
+ public KuduOneToOnePartitioner(AbstractKuduInputOperator prototypeOperator)
+ {
+ super(prototypeOperator);
+ }
+
+ /***
+ * Takes the total list of possible partition scans from a SELECT * expression and then distributes one
+ * tablet per operator id. Note that the operator id is just an integer representation in this method. See
+ * {@link AbstractKuduInputPartitioner.PartitioningContext#definePartitions(Collection, PartitioningContext)} where
+ * this method is used to assign the plan to the actual operator instances. Please see
+ * {@link AbstractKuduPartitionScanner#preparePlanForScanners(SQLToKuduPredicatesTranslator)} for details as to how
+ * a query based planning is done.
+ * @param totalList The ltotal list of possible tablet scans for all queries
+ * @param context The context that is provided by the framework when repartitioning is to be executed
+ * @return A Map of an operator identifier to the list of partition assignments.Note the Operator identifier is a
+ * simple ordinal numbering of the operator and not the actual operator id.
+ */
+ @Override
+ public Map<Integer, List<KuduPartitionScanAssignmentMeta>> assign(List<KuduPartitionScanAssignmentMeta> totalList,
+ PartitioningContext context)
+ {
+ Map<Integer,List<KuduPartitionScanAssignmentMeta>> partitionAssignments = new HashMap<>();
+ for ( int i = 0; i < totalList.size(); i++) {
+ List<KuduPartitionScanAssignmentMeta> assignmentForThisPartition = new ArrayList<>();
+ assignmentForThisPartition.add(totalList.get(i));
+ partitionAssignments.put(i,assignmentForThisPartition);
+ }
+ return partitionAssignments;
+ }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
similarity index 74%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
index 64b46c6..c7e1bc4 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/KuduPartitionScanStrategy.java
@@ -16,17 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.partitioner;
/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Used to set the partition scan.
*/
-public enum KuduMutationType
+public enum KuduPartitionScanStrategy
{
-
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
+ ONE_TABLET_PER_OPERATOR,
+ MANY_TABLETS_PER_OPERATOR
}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
similarity index 58%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
index 64b46c6..cb54abc 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/partitioner/package-info.java
@@ -16,17 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The partitioner package classes are responsible for creating the right number of Apex operators for a given kudu
+ * table that is being scanned. There are two configuration modes that are provided. Map one Kudu partition to
+ * one Apex operator or alternately map multiple Kudu tablets to one Apex operator. The bulk of the implementation
+ * is in the {@link org.apache.apex.malhar.kudu.partitioner.AbstractKuduInputPartitioner}.
*/
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.partitioner;
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java
new file mode 100644
index 0000000..b998f60
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScanner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.KuduSQLParseTreeListener;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.client.AsyncKuduScanner;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An abstract class that contains logic common to all types of Scanner. A scanner is responsible for scanning rows from
+ * the kudu table based on the incoming SQL query. See {@link KuduPartitionConsistentOrderScanner}
+ * and {@link KuduPartitionRandomOrderScanner} for options available as scanners.
+ */
+public abstract class AbstractKuduPartitionScanner<T,C extends InputOperatorControlTuple>
+{
+ @JsonIgnore
+ AbstractKuduInputOperator<T,C> parentOperator;
+
+ ExecutorService kuduConsumerExecutor;
+
+ ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionInfo;
+
+ int threadPoolExecutorSize = 1;
+
+ Map<Integer, ApexKuduConnection> connectionPoolForThreads = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduPartitionScanner.class);
+
+ /***
+ * Scans all of the scans planned for the current input operator. Note that the current implementations are
+ * threaded implementations. Depending on the scanner, there are multiple threads or single thread
+ * @param parsedQuery Instance of the parser that parsed the incoming query.
+ * @param setters Setters that are resolved for the POJO class that represents the Kudu row
+ * @return The total number of records that have been scanned for the passed query
+ * @throws IOException
+ */
+ public abstract int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery,
+ Map<String,Object> setters) throws IOException;
+
+ public AbstractKuduPartitionScanner(AbstractKuduInputOperator<T,C> parentOperatorRunningThisScanner)
+ {
+ parentOperator = parentOperatorRunningThisScanner;
+ apexKuduConnectionInfo = parentOperator.getApexKuduConnectionInfo();
+ }
+
+ /***
+ * Implements common initialization logic across all types of scanners. Currently it is 1. Creating the thread pool
+ * service for executors and 2. Creating the requisite connection pool for the scanners to use.
+ */
+ public void initScannerCommons()
+ {
+ kuduConsumerExecutor = Executors.newFixedThreadPool(threadPoolExecutorSize);
+ List<KuduPartitionScanAssignmentMeta> allPartitionsThatNeedScan = parentOperator.getPartitionPieAssignment();
+ Collections.sort(allPartitionsThatNeedScan,
+ new Comparator<KuduPartitionScanAssignmentMeta>()
+ {
+ @Override
+ public int compare(KuduPartitionScanAssignmentMeta left, KuduPartitionScanAssignmentMeta right)
+ {
+ return left.getOrdinal() - right.getOrdinal();
+ }
+ });
+ for ( int i = 0; i < threadPoolExecutorSize; i++) {
+ connectionPoolForThreads.put(i,apexKuduConnectionInfo.build());
+ }
+ LOG.info("Scanner running with " + connectionPoolForThreads.size() + " kudu connections");
+ }
+
+ /***
+ * Used to renew a connection in case it is dead. There can be scenarios when there is a continuous sequence of
+ * queries that do not touch a tablet resulting in inactivity on the kudu client session.
+ * @param indexPos The index position in the connection pool
+ * @return A renewed connection in case it was dead
+ */
+ public ApexKuduConnection verifyConnectionStaleness(int indexPos)
+ {
+ ApexKuduConnection apexKuduConnection = connectionPoolForThreads.get(indexPos);
+ checkNotNull(apexKuduConnection, "Null connection not expected while checking staleness of" +
+ " existing connection");
+ if (apexKuduConnection.getKuduSession().isClosed()) {
+ try {
+ apexKuduConnection.close(); // closes the wrapper
+ } catch (Exception e) {
+ LOG.error(" Could not close a possibly stale kudu connection handle ", e);
+ }
+ LOG.info("Ripped the old kudu connection out and building a new connection for this scanner");
+ ApexKuduConnection newConnection = apexKuduConnection.getBuilderForThisConnection().build();
+ connectionPoolForThreads.put(indexPos,newConnection);
+ return newConnection;
+ } else {
+ return apexKuduConnection;
+ }
+ }
+
+ /***
+ * The main logic which takes the parsed in query and builds the Kudud scan tokens specific to this query.
+ * It makes sure that these scan tokens are sorted before the actual scan tokens that are to be executed in the
+ * current physical instance of the operator are shortlisted. Since the kudu scan taken builder gives the scan
+ * tokens for the query and does not differentiate between a distributed system and a single instance system, this
+ * method takes the plan as generated by the Kudu scan token builder and then chooses only those segments that were
+ * decided to be the responsibility of this operator at partitioning time.
+ * @param parsedQuery The parsed query instance
+ * @return A list of partition scan metadata objects that are applicable for this instance of the physical operator
+ * i.e. the operator owning this instance of the scanner.
+ * @throws IOException If the scan assignment cannot be serialized
+ */
+ public List<KuduPartitionScanAssignmentMeta> preparePlanForScanners(SQLToKuduPredicatesTranslator parsedQuery)
+ throws IOException
+ {
+ List<KuduPredicate> predicateList = parsedQuery.getKuduSQLParseTreeListener().getKuduPredicateList();
+ ApexKuduConnection apexKuduConnection = verifyConnectionStaleness(0);// we will have atleast one connection
+ KuduScanToken.KuduScanTokenBuilder builder = apexKuduConnection.getKuduClient().newScanTokenBuilder(
+ apexKuduConnection.getKuduTable());
+ builder = builder.setProjectedColumnNames(new ArrayList<>(
+ parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed()));
+ for (KuduPredicate aPredicate : predicateList) {
+ builder = builder.addPredicate(aPredicate);
+ }
+ builder.setFaultTolerant(parentOperator.isFaultTolerantScanner());
+ Map<String,String> optionsUsedForThisQuery = parentOperator.getOptionsEnabledForCurrentQuery();
+ if ( optionsUsedForThisQuery.containsKey(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME)) {
+ try {
+ long readSnapShotTime = Long.valueOf(optionsUsedForThisQuery.get(KuduSQLParseTreeListener.READ_SNAPSHOT_TIME));
+ builder = builder.readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT);
+ builder = builder.snapshotTimestampMicros(readSnapShotTime);
+ LOG.info("Using read snapshot for this query as " + readSnapShotTime);
+ } catch ( Exception ex) {
+ LOG.error("Cannot parse the Read snaptshot time " + ex.getMessage(), ex);
+ }
+ }
+ List<KuduScanToken> allPossibleScanTokens = builder.build();
+ Collections.sort(allPossibleScanTokens, // Make sure we deal with a sorted list of scan tokens
+ new Comparator<KuduScanToken>()
+ {
+ @Override
+ public int compare(KuduScanToken left, KuduScanToken right)
+ {
+ return left.compareTo(right);
+ }
+ });
+ LOG.info(" Query will scan " + allPossibleScanTokens.size() + " tablets");
+ if ( LOG.isDebugEnabled()) {
+ LOG.debug(" Predicates scheduled for this query are " + predicateList.size());
+ for ( int i = 0; i < allPossibleScanTokens.size(); i++) {
+ LOG.debug("A tablet scheduled for all operators scanning is " + allPossibleScanTokens.get(i).getTablet());
+ }
+ }
+ List<KuduPartitionScanAssignmentMeta> partitionPieForThisOperator = parentOperator.getPartitionPieAssignment();
+ List<KuduPartitionScanAssignmentMeta> returnOfAssignments = new ArrayList<>();
+ int totalScansForThisQuery = allPossibleScanTokens.size();
+ int counterForPartAssignments = 0;
+ for (KuduPartitionScanAssignmentMeta aPartofThePie : partitionPieForThisOperator) {
+ if ( aPartofThePie.getOrdinal() < totalScansForThisQuery) { // a given query plan might have less scantokens
+ KuduPartitionScanAssignmentMeta aMetaForThisQuery = new KuduPartitionScanAssignmentMeta();
+ aMetaForThisQuery.setTotalSize(totalScansForThisQuery);
+ aMetaForThisQuery.setOrdinal(counterForPartAssignments);
+ counterForPartAssignments += 1;
+ aMetaForThisQuery.setCurrentQuery(parsedQuery.getSqlExpresssion());
+ // we pick up only those ordinals that are part of the original partition pie assignment
+ KuduScanToken aTokenForThisOperator = allPossibleScanTokens.get(aPartofThePie.getOrdinal());
+ aMetaForThisQuery.setSerializedKuduScanToken(aTokenForThisOperator.serialize());
+ returnOfAssignments.add(aMetaForThisQuery);
+ LOG.debug("Added query scan for this operator " + aMetaForThisQuery + " with scan tablet as " +
+ allPossibleScanTokens.get(aPartofThePie.getOrdinal()).getTablet());
+ }
+ }
+ LOG.info(" A total of " + returnOfAssignments.size() + " have been scheduled for this operator");
+ return returnOfAssignments;
+ }
+
+ /***
+ * Closes the connection pool that is being maintained for each of the scanners.
+ */
+ public void close()
+ {
+ for ( int i = 0; i < connectionPoolForThreads.size(); i++) {
+ try {
+ connectionPoolForThreads.get(i).close();
+ } catch (Exception e) {
+ LOG.error("Error while closing kudu connection ",e);
+ }
+ }
+ kuduConsumerExecutor.shutdown();
+ }
+
+ public AbstractKuduInputOperator<T, C> getParentOperator()
+ {
+ return parentOperator;
+ }
+
+ public void setParentOperator(AbstractKuduInputOperator<T, C> parentOperator)
+ {
+ this.parentOperator = parentOperator;
+ }
+
+ public Map<Integer, ApexKuduConnection> getConnectionPoolForThreads()
+ {
+ return connectionPoolForThreads;
+ }
+
+ public void setConnectionPoolForThreads(Map<Integer, ApexKuduConnection> connectionPoolForThreads)
+ {
+ this.connectionPoolForThreads = connectionPoolForThreads;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java
new file mode 100644
index 0000000..a3ed595
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionConsistentOrderScanner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <p>A scanner implementation that scans kudu tablet rows in a consistent order. The ordered nature is guaranteed by
+ * the following aspects.</p>
+ * <ol>
+ * <li>All scan tokens for a given query are always ordered</li>
+ * <li>A given operator always takes a select ordinal scan tokens for a given list of scan tokens as step 1 above</li>
+ * <li>The consistent order scanner always scans one tablet after another ( and hence less performant compared to
+ * that of the Random order scanner {@link KuduPartitionRandomOrderScanner}) </li>
+ * </ol>
+ */
+public class KuduPartitionConsistentOrderScanner<T,C extends InputOperatorControlTuple>
+ extends AbstractKuduPartitionScanner<T,C>
+{
+
+ public KuduPartitionConsistentOrderScanner(AbstractKuduInputOperator<T,C> parentOperator)
+ {
+ super(parentOperator);
+ threadPoolExecutorSize = 1;
+ initScannerCommons();
+ }
+
+ @Override
+ public int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery,
+ Map<String,Object> setters) throws IOException
+ {
+ List<KuduPartitionScanAssignmentMeta> plannedScansForthisQuery = preparePlanForScanners(parsedQuery);
+ kuduConsumerExecutor.submit(new SequentialScannerThread(parsedQuery,setters,plannedScansForthisQuery));
+ return plannedScansForthisQuery.size();
+ }
+
+ /**
+ * A simple thread that ensures that all of the scan tokens are executed one after another. We need this thread
+ * to not block the {@link KuduPartitionConsistentOrderScanner#scanAllRecords(SQLToKuduPredicatesTranslator, Map)} to
+ * not get blocked as it is called during an emitTuple() call. The Future returns the number of records scanned.
+ */
+ public class SequentialScannerThread implements Callable<Long>
+ {
+ SQLToKuduPredicatesTranslator parsedQuery;
+
+ Map<String,Object> settersForThisQuery;
+
+ List<KuduPartitionScanAssignmentMeta> scansForThisQuery;
+
+ ExecutorService executorServiceForSequentialScanner = Executors.newFixedThreadPool(1);
+
+ public SequentialScannerThread(SQLToKuduPredicatesTranslator parsedQueryTree,Map<String,Object> setters,
+ List<KuduPartitionScanAssignmentMeta> plannedScans)
+ {
+ checkNotNull(parsedQueryTree,"Parsed SQL expression cannot be null for scanner");
+ checkNotNull(setters,"Setters cannot be null for the scanner thread");
+ checkNotNull(plannedScans,"Planned scan segments cannot be null for scanner thread");
+ parsedQuery = parsedQueryTree;
+ settersForThisQuery = setters;
+ scansForThisQuery = plannedScans;
+ }
+
+ @Override
+ public Long call() throws Exception
+ {
+ long overallCount = 0;
+ int counterForMeta = 0;
+ for ( KuduPartitionScanAssignmentMeta aMeta : scansForThisQuery) {
+ KuduPartitionScannerCallable<T,C> aScanJobThread = new KuduPartitionScannerCallable<T,C>(parentOperator,aMeta,
+ verifyConnectionStaleness(counterForMeta),settersForThisQuery,parsedQuery);
+ counterForMeta += 1;
+ Future<Long> scanResult = executorServiceForSequentialScanner.submit(aScanJobThread);
+ overallCount += scanResult.get(); // block till we complete this chunk;
+ }
+ executorServiceForSequentialScanner.shutdown(); // release resources
+ return overallCount;
+ }
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java
new file mode 100644
index 0000000..9054c46
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionRandomOrderScanner.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+
+/**
+ * <p>A scanner implementation that scans kudu tablet rows in an order which does not guarantee the same
+ * sequence of tuples in case of restart after a crash/kill. The nature of unordered scans is because
+ * of an eager approach to scan kudu tablets as all the thread pool based threads scan in parallel and hence
+ * ordering cannot be guaranteed to the same for each run on the same data set.
+ */
+public class KuduPartitionRandomOrderScanner<T,C extends InputOperatorControlTuple>
+ extends AbstractKuduPartitionScanner<T,C>
+{
+
+ public KuduPartitionRandomOrderScanner(AbstractKuduInputOperator<T,C> parentOperator)
+ {
+ super(parentOperator);
+ threadPoolExecutorSize = parentOperator.getPartitionPieAssignment().size();
+ initScannerCommons();
+ }
+
+ /***
+ * Scans all the records for the given query by launching a parallel scan of all the tablets that can
+ * serve the data for the given query.
+ * @param parsedQuery Instance of the parser that parsed the incoming query.
+ * @param setters Setters that are resolved for the POJO class that represents the Kudu row
+ * @return Total number of rows scanned
+ * @throws IOException When the Kudu connection cannot be closed after preparing the plan.
+ */
+ @Override
+ public int scanAllRecords(SQLToKuduPredicatesTranslator parsedQuery, Map<String, Object> setters) throws IOException
+ {
+ int counterForMeta = 0;
+ List<KuduPartitionScanAssignmentMeta> preparedScans = preparePlanForScanners(parsedQuery);
+ for ( KuduPartitionScanAssignmentMeta aMeta : preparedScans) {
+ KuduPartitionScannerCallable<T,C> aScanJobThread = new KuduPartitionScannerCallable<T,C>(parentOperator,aMeta,
+ verifyConnectionStaleness(counterForMeta),setters,parsedQuery);
+ counterForMeta += 1;
+ kuduConsumerExecutor.submit(aScanJobThread);
+ }
+ return preparedScans.size();
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java
new file mode 100644
index 0000000..1875aaa
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScanAssignmentMeta.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * The main metadata class that is used to hold the information about the scan token and also the ordinal
+ * position out of the total.
+ */
+public class KuduPartitionScanAssignmentMeta implements Serializable
+{
+ private static final long serialVersionUID = -3074453209476815690L;
+
+ private String currentQuery;
+
+ private byte[] serializedKuduScanToken;
+
+ private int ordinal;
+
+ private int totalSize;
+
+ public String getCurrentQuery()
+ {
+ return currentQuery;
+ }
+
+ public void setCurrentQuery(String currentQuery)
+ {
+ this.currentQuery = currentQuery;
+ }
+
+ public byte[] getSerializedKuduScanToken()
+ {
+ return serializedKuduScanToken;
+ }
+
+ public void setSerializedKuduScanToken(byte[] serializedKuduScanToken)
+ {
+ this.serializedKuduScanToken = serializedKuduScanToken;
+ }
+
+ public int getOrdinal()
+ {
+ return ordinal;
+ }
+
+ public void setOrdinal(int ordinal)
+ {
+ this.ordinal = ordinal;
+ }
+
+ public int getTotalSize()
+ {
+ return totalSize;
+ }
+
+ public void setTotalSize(int totalSize)
+ {
+ this.totalSize = totalSize;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KuduPartitionScanAssignmentMeta)) {
+ return false;
+ }
+
+ KuduPartitionScanAssignmentMeta that = (KuduPartitionScanAssignmentMeta)o;
+
+ if (getOrdinal() != that.getOrdinal()) {
+ return false;
+ }
+ if (getTotalSize() != that.getTotalSize()) {
+ return false;
+ }
+ return getCurrentQuery().equals(that.getCurrentQuery());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = getCurrentQuery().hashCode();
+ result = 31 * result + getOrdinal();
+ result = 31 * result + getTotalSize();
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KuduPartitionScanAssignmentMeta{" +
+ "currentQuery='" + currentQuery + '\'' +
+ ", serializedKuduScanToken=" + Arrays.toString(serializedKuduScanToken) +
+ ", ordinal=" + ordinal +
+ ", totalSize=" + totalSize +
+ '}';
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java
new file mode 100644
index 0000000..e664de2
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallable.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.ApexKuduConnection;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+import com.datatorrent.lib.util.PojoUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/***
+ * A callable implementation that is responsible for scanning all rows that are scannable from a given
+ * scan token range from a single Kudu tablet. The rows are sent to the input operator buffer so that they
+ * can be emitted as windows progress.
+ */
+public class KuduPartitionScannerCallable<T,C extends InputOperatorControlTuple> implements Callable<Long>
+{
+ private AbstractKuduInputOperator<T,C> operatorUsingThisScanner;
+
+ private KuduPartitionScanAssignmentMeta kuduPartitionScanAssignmentMeta;
+
+ private BlockingQueue<KuduRecordWithMeta<T>> bufferForTransmittingRecords;
+
+ private Class<T> clazzForResultObject;
+
+ private transient KuduClient kuduClientHandle;
+
+ private Map<String,ColumnSchema> tableSchema;
+
+ private Map<String,Object> settersForThisQueryScan;
+
+ private SQLToKuduPredicatesTranslator parsedQuery;
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduPartitionScannerCallable.class);
+
+ public KuduPartitionScannerCallable(AbstractKuduInputOperator<T,C> kuduInputOperator,
+ KuduPartitionScanAssignmentMeta partitionMeta, ApexKuduConnection apexKuduConnection,Map<String,Object> setters,
+ SQLToKuduPredicatesTranslator parsedQueryInstance)
+ {
+ checkNotNull(kuduInputOperator,"Kudu operator instance cannot be null in the kudu scanner thread");
+ checkNotNull(partitionMeta, "Partition metadata cannot be null in kudu scanner thread");
+ checkNotNull(apexKuduConnection,"Kudu connection cannot be null in the kudu scanner thread");
+ checkNotNull(apexKuduConnection,"Setters cannot be null in the kudu scanner thread");
+ checkNotNull(parsedQueryInstance, "parsed Query instance cannot be null");
+ operatorUsingThisScanner = kuduInputOperator;
+ kuduPartitionScanAssignmentMeta = partitionMeta;
+ bufferForTransmittingRecords = kuduInputOperator.getBuffer();
+ clazzForResultObject = kuduInputOperator.getClazzForResultObject();
+ checkNotNull(apexKuduConnection,"Kudu connection cannot be null when initializing scanner");
+ kuduClientHandle = apexKuduConnection.getKuduClient();
+ checkNotNull(kuduClientHandle,"Kudu client cannot be null when initializing scanner");
+ tableSchema = kuduInputOperator.getKuduColNameToSchemaMapping();
+ settersForThisQueryScan = setters;
+ parsedQuery = parsedQueryInstance;
+ }
+
+ public void setValuesInPOJO(RowResult aRow, T payload)
+ {
+ Set<String> columnsUsed = parsedQuery.getKuduSQLParseTreeListener().getListOfColumnsUsed();
+ for (String aColumnName : columnsUsed) {
+ ColumnSchema schemaForThisColumn = tableSchema.get(aColumnName);
+ if (aRow.isNull(aColumnName)) {
+ continue;
+ }
+ switch ( schemaForThisColumn.getType().getDataType().getNumber()) {
+ case Common.DataType.BINARY_VALUE:
+ ((PojoUtils.Setter<T,ByteBuffer>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getBinary(aColumnName));
+ break;
+ case Common.DataType.STRING_VALUE:
+ ((PojoUtils.Setter<T,String>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getString(aColumnName));
+ break;
+ case Common.DataType.BOOL_VALUE:
+ ((PojoUtils.SetterBoolean<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getBoolean(aColumnName));
+ break;
+ case Common.DataType.DOUBLE_VALUE:
+ ((PojoUtils.SetterDouble<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getDouble(aColumnName));
+ break;
+ case Common.DataType.FLOAT_VALUE:
+ ((PojoUtils.SetterFloat<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getFloat(aColumnName));
+ break;
+ case Common.DataType.INT8_VALUE:
+ ((PojoUtils.SetterByte<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getByte(aColumnName));
+ break;
+ case Common.DataType.INT16_VALUE:
+ ((PojoUtils.SetterShort<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getShort(aColumnName));
+ break;
+ case Common.DataType.INT32_VALUE:
+ ((PojoUtils.SetterInt<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getInt(aColumnName));
+ break;
+ case Common.DataType.UNIXTIME_MICROS_VALUE:
+ case Common.DataType.INT64_VALUE:
+ ((PojoUtils.SetterLong<T>)settersForThisQueryScan.get(aColumnName)).set(
+ payload,aRow.getLong(aColumnName));
+ break;
+ case Common.DataType.UINT8_VALUE:
+ LOG.error("Unsigned int 8 not supported yet");
+ throw new RuntimeException("uint8 not supported in Kudu schema yet");
+ case Common.DataType.UINT16_VALUE:
+ LOG.error("Unsigned int 16 not supported yet");
+ throw new RuntimeException("uint16 not supported in Kudu schema yet");
+ case Common.DataType.UINT32_VALUE:
+ LOG.error("Unsigned int 32 not supported yet");
+ throw new RuntimeException("uint32 not supported in Kudu schema yet");
+ case Common.DataType.UINT64_VALUE:
+ LOG.error("Unsigned int 64 not supported yet");
+ throw new RuntimeException("uint64 not supported in Kudu schema yet");
+ case Common.DataType.UNKNOWN_DATA_VALUE:
+ LOG.error("unknown data type ( complex types ? ) not supported yet");
+ throw new RuntimeException("Unknown data type ( complex types ? ) not supported in Kudu schema yet");
+ default:
+ LOG.error("unknown type/default ( complex types ? ) not supported yet");
+ throw new RuntimeException("Unknown type/default ( complex types ? ) not supported in Kudu schema yet");
+ }
+ }
+ }
+
+ @Override
+ public Long call() throws Exception
+ {
+ long numRowsScanned = 0;
+ KuduScanner aPartitionSpecificScanner = KuduScanToken.deserializeIntoScanner(
+ kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(), kuduClientHandle);
+ LOG.info("Scanning the following tablet " + KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta
+ .getSerializedKuduScanToken(), kuduClientHandle));
+ KuduRecordWithMeta<T> beginScanRecord = new KuduRecordWithMeta<>();
+ beginScanRecord.setBeginScanMarker(true);
+ beginScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+ bufferForTransmittingRecords.add(beginScanRecord); // Add a record entry that denotes the end of this scan.
+ while ( aPartitionSpecificScanner.hasMoreRows()) {
+ LOG.debug("Number of columns being returned for this read " +
+ aPartitionSpecificScanner.getProjectionSchema().getColumnCount());
+ RowResultIterator resultIterator = aPartitionSpecificScanner.nextRows();
+ if (resultIterator == null) {
+ break;
+ } else {
+ while (resultIterator.hasNext()) {
+ KuduRecordWithMeta<T> recordWithMeta = new KuduRecordWithMeta<>();
+ RowResult aRow = resultIterator.next();
+ recordWithMeta.setPositionInScan(numRowsScanned);
+ T payload = clazzForResultObject.newInstance();
+ recordWithMeta.setThePayload(payload);
+ recordWithMeta.setEndOfScanMarker(false);
+ recordWithMeta.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+ setValuesInPOJO(aRow,payload);
+ bufferForTransmittingRecords.add(recordWithMeta);
+ numRowsScanned += 1;
+ }
+ }
+ }
+ aPartitionSpecificScanner.close();
+ KuduRecordWithMeta<T> endScanRecord = new KuduRecordWithMeta<>();
+ endScanRecord.setEndOfScanMarker(true);
+ endScanRecord.setTabletMetadata(kuduPartitionScanAssignmentMeta);
+ bufferForTransmittingRecords.add(endScanRecord); // Add a record entry that denotes the end of this scan.
+ LOG.info(" Scanned a total of " + numRowsScanned + " for this scanner thread @tablet " +
+ KuduScanToken.stringifySerializedToken(kuduPartitionScanAssignmentMeta.getSerializedKuduScanToken(),
+ kuduClientHandle));
+ return numRowsScanned;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java
new file mode 100644
index 0000000..77b7314
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduRecordWithMeta.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+/**
+ * Represents a Kudu row and metadata for the row that was consumed by the Kudu scanner client.
+ * The metadata includes information like the ordinal position for this scan instance, the query and the scan
+ * token was scheduled.
+ */
+public class KuduRecordWithMeta<T>
+{
+ private T thePayload;
+
+ private long positionInScan;
+
+ private boolean isEndOfScanMarker;
+
+ private boolean isBeginScanMarker;
+
+ private KuduPartitionScanAssignmentMeta tabletMetadata;
+
+
+ public T getThePayload()
+ {
+ return thePayload;
+ }
+
+ public void setThePayload(T thePayload)
+ {
+ this.thePayload = thePayload;
+ }
+
+ public long getPositionInScan()
+ {
+ return positionInScan;
+ }
+
+ public void setPositionInScan(long positionInScan)
+ {
+ this.positionInScan = positionInScan;
+ }
+
+ public boolean isEndOfScanMarker()
+ {
+ return isEndOfScanMarker;
+ }
+
+ public void setEndOfScanMarker(boolean endOfScanMarker)
+ {
+ isEndOfScanMarker = endOfScanMarker;
+ }
+
+ public KuduPartitionScanAssignmentMeta getTabletMetadata()
+ {
+ return tabletMetadata;
+ }
+
+ public void setTabletMetadata(KuduPartitionScanAssignmentMeta tabletMetadata)
+ {
+ this.tabletMetadata = tabletMetadata;
+ }
+
+ public boolean isBeginScanMarker()
+ {
+ return isBeginScanMarker;
+ }
+
+ public void setBeginScanMarker(boolean beginScanMarker)
+ {
+ isBeginScanMarker = beginScanMarker;
+ }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
similarity index 74%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
index 64b46c6..130a8ef 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/KuduScanOrderStrategy.java
@@ -16,17 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.scanner;
/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Used to define the scan order strategy when multiple scan tokens are assigned to a single Apex partition
*/
-public enum KuduMutationType
+public enum KuduScanOrderStrategy
{
-
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
+ CONSISTENT_ORDER_SCANNER,
+ RANDOM_ORDER_SCANNER
}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
similarity index 51%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
index 64b46c6..6904a7d 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/scanner/package-info.java
@@ -16,17 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The scanner package holds classes responsible scanning kudu tablets as per the provided predicates. There are two
+ * types of scanner model provided. {@link org.apache.apex.malhar.kudu.scanner.KuduPartitionConsistentOrderScanner}
+ * provides for a consistent ordering of tuples. This is to be used when exactly once processing semantics are
+ * required in the downstream operators. The second model is
+ * {@link org.apache.apex.malhar.kudu.scanner.KuduPartitionRandomOrderScanner} provides for a throughput oriented
+ * implementation and cannot be used when exactly once semantics is required in the downstream operators.
*/
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.scanner;
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java
new file mode 100644
index 0000000..5ff08c5
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLExpressionErrorListener.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.antlr.v4.runtime.BaseErrorListener;
+import org.antlr.v4.runtime.RecognitionException;
+import org.antlr.v4.runtime.Recognizer;
+
+/**
+ * A simple error listener that is plugged into the Kudu expression parser
+ */
+public class KuduSQLExpressionErrorListener extends BaseErrorListener
+{
+ private boolean syntaxError = false;
+
+ private List<String> listOfErrorMessages = new ArrayList<>();
+
+ @Override
+ public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol, int line, int charPositionInLine,
+ String msg, RecognitionException e)
+ {
+ super.syntaxError(recognizer, offendingSymbol, line, charPositionInLine, msg, e);
+ syntaxError = true;
+ listOfErrorMessages.add(msg);
+ }
+
+ public boolean isSyntaxError()
+ {
+ return syntaxError;
+ }
+
+ public void setSyntaxError(boolean syntaxError)
+ {
+ this.syntaxError = syntaxError;
+ }
+
+ public List<String> getListOfErrorMessages()
+ {
+ return listOfErrorMessages;
+ }
+
+ public void setListOfErrorMessages(List<String> listOfErrorMessages)
+ {
+ this.listOfErrorMessages = listOfErrorMessages;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java
new file mode 100644
index 0000000..a1e0ec5
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParseTreeListener.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionBaseListener;
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionParser;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduPredicate;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class builds Kudu equivalent scanner logic from a supplied SQL expression.
+ * Note that the SQL expression is set in {@code KuduSQLParser}
+ * The parser class uses a "tree walker" to walk the parsed tree and while doing so registers this class
+ * This listener then start building the Kudu scanner constructs as it listens to the tree walk steps.
+ * Note that it only only overrides required methods from its base class {@code KuduSQLExpressionBaseListener}
+ * The KuduSQLExpressionBaseListener class is an autogenerated class by the Antlr4 compiler
+ * */
+public class KuduSQLParseTreeListener extends KuduSQLExpressionBaseListener
+{
+ private boolean isSuccessfullyParsed = true;
+
+ private boolean isFilterExpressionEnabled = false;
+
+ private boolean isOptionsEnabled = false;
+
+ private Set<String> listOfColumnsUsed = new HashSet<>();
+
+ private Map<String,String> aliases = new HashMap<>();
+
+ private Map<String,String> optionsUsed = new HashMap<>();
+
+ private List<KuduPredicate> kuduPredicateList = new ArrayList<>();
+
+ private Map<String,ColumnSchema> columnSchemaLookups = new HashMap<>();
+
+ private String tableName = null;
+
+ private boolean isSelectStarExpressionEnabled = false;
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(KuduSQLParseTreeListener.class);
+
+ private String controlTupleMessage = null;
+
+ private Long readSnapshotTime = null;
+
+ public static final String READ_SNAPSHOT_TIME = "read_snapshot_time";
+
+ public static final String CONTROLTUPLE_MESSAGE = "controltuple_message";
+
+ private void clearAll()
+ {
+ listOfColumnsUsed.clear();
+ aliases.clear();
+ optionsUsed.clear();
+ kuduPredicateList.clear();
+ tableName = null;
+ columnSchemaLookups.clear();
+ }
+
+ public void setColumnSchemaList(List<ColumnSchema> listOfColumnsForCurrentTable)
+ {
+ Preconditions.checkNotNull(listOfColumnsForCurrentTable,"Column schemas " +
+ "cannot be null for kudu table");
+ for (ColumnSchema aColumnDef : listOfColumnsForCurrentTable) {
+ columnSchemaLookups.put(aColumnDef.getName(),aColumnDef);
+ aliases.put(aColumnDef.getName(),aColumnDef.getName()); // By default each column is its own alias in POJO.
+ }
+ }
+
+ @Override
+ public void exitKudusqlexpression(KuduSQLExpressionParser.KudusqlexpressionContext ctx)
+ {
+ LOG.debug(" Scanning " + listOfColumnsUsed.size() + " columns from " + tableName + " table");
+ LOG.debug(" Number of predicates that are part of the scan are " + kuduPredicateList.size());
+ LOG.debug(" Select * expression enabled " + isSelectStarExpressionEnabled);
+ LOG.debug(" Filter expression enabled " + isFilterExpressionEnabled);
+ LOG.debug(" Are options being enabled " + isOptionsEnabled);
+ LOG.debug(" Read snapshot time being used as " + readSnapshotTime);
+ LOG.debug(" Control tuple message being used as " + controlTupleMessage);
+ }
+
+ @Override
+ public void exitSELECT_ID_ONLY_USED_AS_COLUMN_NAME(KuduSQLExpressionParser.SELECT_ID_ONLY_USED_AS_COLUMN_NAMEContext
+ ctx)
+ {
+ super.exitSELECT_ID_ONLY_USED_AS_COLUMN_NAME(ctx);
+ KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName = ctx.idorcolumnname();
+ if (ctxForIDOrColumnName != null ) { // we can ignore complex expressions as they are resolved recursively
+ // We are not inside as complex comma separated expression
+ String columnName = extractColumnNameFromContext(ctxForIDOrColumnName);
+ if ( columnName != null) {
+ if (columnSchemaLookups.containsKey(columnName)) {
+ LOG.debug(" Kudu column being enabled for scanning " + columnName );
+ aliases.put(columnName,columnName);
+ listOfColumnsUsed.add(columnName);
+ } else {
+ LOG.error("Invalid column being set as a scanner ( column names are case sensitive ). " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void exitALL_COLUMNS_SELECT_EXP(KuduSQLExpressionParser.ALL_COLUMNS_SELECT_EXPContext ctx)
+ {
+ super.exitALL_COLUMNS_SELECT_EXP(ctx);
+ listOfColumnsUsed.addAll(columnSchemaLookups.keySet());
+ isSelectStarExpressionEnabled = true;
+ }
+
+ private String extractColumnNameFromContext(KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName)
+ {
+ if (ctxForIDOrColumnName.ID() != null) {
+ // we are dealing with a non-reserved keyword as a columnname.
+ return ctxForIDOrColumnName.ID().getSymbol().getText();
+ } else {
+ for (int i = 0; i < ctxForIDOrColumnName.getChildCount(); i++) {
+ // iterate all the terminal node children to see wihich keyword is used. Note that there are spaces possible
+ ParseTree terminalNodeTree = ctxForIDOrColumnName.getChild(i);
+ String childNodeText = terminalNodeTree.getText();
+ if ( (!childNodeText.equalsIgnoreCase(" ")) &&
+ (!childNodeText.equalsIgnoreCase("'"))
+ ) {
+ return childNodeText; // Anything other thant a quote or whitespace is the column name ( reserved )
+ }
+ } // end for loop for terminal node children
+ } // conditional for non-ID column names
+ return null;
+ }
+
+ @Override
+ public void exitSELECT_ALIAS_USED(KuduSQLExpressionParser.SELECT_ALIAS_USEDContext ctx)
+ {
+ super.exitSELECT_ALIAS_USED(ctx);
+ KuduSQLExpressionParser.IdorcolumnnameContext ctxForIDOrColumnName = ctx.idorcolumnname();
+ if (ctxForIDOrColumnName != null ) { // we can ignore complex expressions as they are resolved recursively
+ // We are not inside as complex comma separated expression
+ String columnName = extractColumnNameFromContext(ctxForIDOrColumnName);
+ if ( columnName != null) {
+ if (columnSchemaLookups.containsKey(columnName)) {
+ LOG.debug(" Kudu column being enabled for scanning " + columnName );
+ LOG.debug(columnName + " is being scanned as " + ctx.ID().getSymbol().getText());
+ aliases.put(columnName,ctx.ID().getSymbol().getText());
+ listOfColumnsUsed.add(columnName);
+ } else {
+ LOG.error("Invalid column being set as a scanner ( column names are case sensitive ). " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ }
+ } // end null check only for id or column names
+ }
+
+ @Override
+ public void exitTableclause(KuduSQLExpressionParser.TableclauseContext ctx)
+ {
+ super.exitTableclause(ctx);
+ tableName = ctx.ID().getText();
+ }
+
+ @Override
+ public void exitWhereclause(KuduSQLExpressionParser.WhereclauseContext ctx)
+ {
+ super.exitWhereclause(ctx);
+ isFilterExpressionEnabled = true;
+ }
+
+ @Override
+ public void exitWithoptionsclause(KuduSQLExpressionParser.WithoptionsclauseContext ctx)
+ {
+ super.exitWithoptionsclause(ctx);
+ isOptionsEnabled = true;
+ }
+
+ private KuduPredicate buildKuduPredicate(String columnName,KuduPredicate.ComparisonOp comparisonOp,
+ KuduSQLExpressionParser.AnyvalueContext anyvalueContext)
+ {
+ ColumnSchema thisColumnSchema = columnSchemaLookups.get(columnName);
+ if (anyvalueContext.bool() != null) {
+ if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.BOOL_VALUE)) {
+ LOG.error(" Mismatched data type for column " + columnName);
+ isSuccessfullyParsed = false;
+ return null;
+ }
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+ Boolean.valueOf(anyvalueContext.bool().getText().toLowerCase()));
+ }
+ if (anyvalueContext.doubleval() != null) {
+ if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.DOUBLE_VALUE)) {
+ LOG.error(" Mismatched data type for column (Ensure doubles are appended with letter d)" + columnName);
+ isSuccessfullyParsed = false;
+ return null;
+ }
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+ Double.valueOf(anyvalueContext.doubleval().getText()));
+ }
+ if (anyvalueContext.floatval() != null) {
+ if (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.FLOAT_VALUE)) {
+ LOG.error(" Mismatched data type for column (Ensure doubles are appended with letter f)" + columnName);
+ isSuccessfullyParsed = false;
+ return null;
+ }
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+ Float.valueOf(anyvalueContext.floatval().getText()));
+ }
+ if (anyvalueContext.stringval() != null) {
+ if ( (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.STRING_VALUE)) &&
+ (thisColumnSchema.getType().getDataType().getNumber() != (Common.DataType.BINARY_VALUE))) {
+ LOG.error(" Mismatched data type for column ( Has to be a string or a binary value enclosed in" +
+ " double quotes" + columnName);
+ isSuccessfullyParsed = false;
+ return null;
+ }
+ if (thisColumnSchema.getType().getDataType().getNumber() == (Common.DataType.STRING_VALUE)) {
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema, comparisonOp,
+ anyvalueContext.stringval().getText());
+ }
+ if (thisColumnSchema.getType().getDataType().getNumber() == (Common.DataType.BINARY_VALUE)) {
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema, comparisonOp,
+ anyvalueContext.stringval().getText().getBytes());
+ }
+ }
+ if (anyvalueContext.numval() != null) {
+ int dataTypeNumberForKuduCol = thisColumnSchema.getType().getDataType().getNumber();
+ if (
+ (dataTypeNumberForKuduCol != (Common.DataType.UNIXTIME_MICROS_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.INT8_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.INT16_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.INT32_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.INT64_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.UINT8_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.UINT16_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.UINT32_VALUE)) &&
+ (dataTypeNumberForKuduCol != (Common.DataType.UINT64_VALUE))
+ ) {
+ LOG.error(" Mismatched data type for column " + columnName);
+ isSuccessfullyParsed = false;
+ return null;
+ }
+ return KuduPredicate.newComparisonPredicate(thisColumnSchema,comparisonOp,
+ Long.valueOf(anyvalueContext.numval().getText()));
+ }
+ return null;
+ }
+
+ @Override
+ public void exitFILTER_COMPARISION_EXP(KuduSQLExpressionParser.FILTER_COMPARISION_EXPContext ctx)
+ {
+ super.exitFILTER_COMPARISION_EXP(ctx);
+ String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+ if (columnSchemaLookups.containsKey(columnName)) {
+ if (ctx.comparisionoperator().EQUAL_TO() != null) {
+ kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.EQUAL,ctx.anyvalue()));
+ }
+ if (ctx.comparisionoperator().GREATER_THAN() != null) {
+ kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.GREATER,ctx.anyvalue()));
+ }
+ if (ctx.comparisionoperator().LESSER_THAN() != null) {
+ kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.LESS,ctx.anyvalue()));
+ }
+ if (ctx.comparisionoperator().GREATER_THAN_OR_EQUAL() != null) {
+ kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.GREATER_EQUAL,ctx.anyvalue()));
+ }
+ if (ctx.comparisionoperator().LESSER_THAN_OR_EQUAL() != null) {
+ kuduPredicateList.add(buildKuduPredicate(columnName,KuduPredicate.ComparisonOp.LESS_EQUAL,ctx.anyvalue()));
+ }
+ } else {
+ LOG.error(columnName + " is not a valid column name for this kudu table");
+ isSuccessfullyParsed = false;
+ }
+ }
+
+ @Override
+ public void exitIS_NULL_FILTER_EXP(KuduSQLExpressionParser.IS_NULL_FILTER_EXPContext ctx)
+ {
+ super.exitIS_NULL_FILTER_EXP(ctx);
+ String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+ if (columnSchemaLookups.containsKey(columnName)) {
+ kuduPredicateList.add(KuduPredicate.newIsNullPredicate(columnSchemaLookups.get(columnName)));
+ } else {
+ LOG.error(columnName + " is not a valid column name for this kudu table");
+ isSuccessfullyParsed = false;
+ }
+
+ }
+
+ private List<Boolean> extractListOfBools(KuduSQLExpressionParser.ListofboolsContext listofboolsContext,
+ String columnName)
+ {
+ List<Boolean> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.BoolContext aBool: listofboolsContext.bool()) {
+ returnList.add(Boolean.valueOf(aBool.getText().toLowerCase()));
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of booleans specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+
+ private List<Double> extractListOfDoubles(KuduSQLExpressionParser.ListofdoublesContext listofdoubles,
+ String columnName)
+ {
+ List<Double> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.DoublevalContext aDouble: listofdoubles.doubleval()) {
+ returnList.add(Double.valueOf(aDouble.getText()));
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of doubles specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+ private List<Float> extractListOfFloats(KuduSQLExpressionParser.ListoffloatsContext listoffloatsContext,
+ String columnName)
+ {
+ List<Float> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.FloatvalContext aFloat: listoffloatsContext.floatval()) {
+ returnList.add(Float.valueOf(aFloat.getText()));
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of floats specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+
+ private List<Long> extractListOfLongs(KuduSQLExpressionParser.ListofnumsContext listofnumsContext,
+ String columnName)
+ {
+ List<Long> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.NumvalContext aLong: listofnumsContext.numval()) {
+ returnList.add(Long.valueOf(aLong.getText()));
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of longs specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+
+ private List<String> extractListOfStrings(KuduSQLExpressionParser.ListofstringsContext listofstringsContext,
+ String columnName)
+ {
+ List<String> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.StringvalContext aString: listofstringsContext.stringval()) {
+ returnList.add(aString.getText());
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of Strings specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+
+ private List<byte[]> extractListOfBinary(KuduSQLExpressionParser.ListofstringsContext listofstringsContext,
+ String columnName)
+ {
+
+ List<byte[]> returnList = new ArrayList<>();
+ for (KuduSQLExpressionParser.StringvalContext aString: listofstringsContext.stringval()) {
+ returnList.add(aString.getText().getBytes());
+ }
+ if (returnList.size() == 0) {
+ LOG.error("Empty list of binaries specified for IN expression for column filter " + columnName);
+ isSuccessfullyParsed = false;
+ }
+ return returnList;
+ }
+
+
+
+ @Override
+ public void exitIN_FILTER_EXP(KuduSQLExpressionParser.IN_FILTER_EXPContext ctx)
+ {
+ super.exitIN_FILTER_EXP(ctx);
+ String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+ if (columnSchemaLookups.containsKey(columnName)) {
+ KuduSQLParser.ListofanyvalueContext listofanyvalueContext = ctx.listofanyvalue();
+ // sort out an IN predicate for a list of bools
+ KuduSQLExpressionParser.ListofboolsContext listofboolsContext = listofanyvalueContext.listofbools();
+ if (listofboolsContext != null) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfBools(listofboolsContext,columnName)));
+ }
+
+ // sort out an IN predicate for a list of doubles
+ KuduSQLExpressionParser.ListofdoublesContext listofdoubles = listofanyvalueContext.listofdoubles();
+ if (listofdoubles != null) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfDoubles(listofdoubles,columnName)));
+ }
+
+ // sort out an IN predicate for a list of floats
+ KuduSQLExpressionParser.ListoffloatsContext listoffloatsContext = listofanyvalueContext.listoffloats();
+ if (listoffloatsContext != null) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfFloats(listoffloatsContext,columnName)));
+ }
+
+ // sort out an IN predicate for a list of longs
+ KuduSQLExpressionParser.ListofnumsContext listofnumsContext = listofanyvalueContext.listofnums();
+ if (listofnumsContext != null) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfLongs(listofnumsContext,columnName)));
+ }
+
+ // sort out an IN predicate for a list of strings or binary
+ KuduSQLExpressionParser.ListofstringsContext listofstringsContext = listofanyvalueContext.listofstrings();
+ if (listofstringsContext != null) {
+ if (columnSchemaLookups.get(columnName).getType().getDataType().getNumber() == Common.DataType.STRING_VALUE) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfStrings(listofstringsContext,columnName)));
+ }
+ if (columnSchemaLookups.get(columnName).getType().getDataType().getNumber() == Common.DataType.STRING_VALUE) {
+ kuduPredicateList.add(KuduPredicate.newInListPredicate(columnSchemaLookups.get(columnName),
+ extractListOfBinary(listofstringsContext,columnName)));
+ }
+ }
+
+ } else {
+ LOG.error(columnName + " is not a valid column name to be used in a where clause for this kudu table");
+ isSuccessfullyParsed = false;
+ }
+ }
+
+ @Override
+ public void exitIS_NOT_NULL_FILTER_EXP(KuduSQLExpressionParser.IS_NOT_NULL_FILTER_EXPContext ctx)
+ {
+ super.exitIS_NOT_NULL_FILTER_EXP(ctx);
+ String columnName = extractColumnNameFromContext(ctx.idorcolumnname());
+ if (columnSchemaLookups.containsKey(columnName)) {
+ kuduPredicateList.add(KuduPredicate.newIsNotNullPredicate(columnSchemaLookups.get(columnName)));
+ } else {
+ LOG.error(columnName + " is not a valid column name for this kudu table");
+ isSuccessfullyParsed = false;
+ }
+ }
+
+ @Override
+ public void exitSET_CONTROL_TUPLE_MSG(KuduSQLExpressionParser.SET_CONTROL_TUPLE_MSGContext ctx)
+ {
+ super.exitSET_CONTROL_TUPLE_MSG(ctx);
+ controlTupleMessage = ctx.STRINGVAL().getText();
+ optionsUsed.put(CONTROLTUPLE_MESSAGE,controlTupleMessage);
+ }
+
+ @Override
+ public void exitSET_READ_SNAPSHOT_TIME(KuduSQLExpressionParser.SET_READ_SNAPSHOT_TIMEContext ctx)
+ {
+ super.exitSET_READ_SNAPSHOT_TIME(ctx);
+ readSnapshotTime = Long.valueOf(ctx.INT().getText());
+ optionsUsed.put(READ_SNAPSHOT_TIME, "" + readSnapshotTime);
+ }
+
+ public boolean isSuccessfullyParsed()
+ {
+ return isSuccessfullyParsed;
+ }
+
+ public void setSuccessfullyParsed(boolean successfullyParsed)
+ {
+ isSuccessfullyParsed = successfullyParsed;
+ }
+
+ public Set<String> getListOfColumnsUsed()
+ {
+ return listOfColumnsUsed;
+ }
+
+ public void setListOfColumnsUsed(Set<String> listOfColumnsUsed)
+ {
+ this.listOfColumnsUsed = listOfColumnsUsed;
+ }
+
+ public Map<String, String> getAliases()
+ {
+ return aliases;
+ }
+
+ public void setAliases(Map<String, String> aliases)
+ {
+ this.aliases = aliases;
+ }
+
+ public Map<String, String> getOptionsUsed()
+ {
+ return optionsUsed;
+ }
+
+ public void setOptionsUsed(Map<String, String> optionsUsed)
+ {
+ this.optionsUsed = optionsUsed;
+ }
+
+ public List<KuduPredicate> getKuduPredicateList()
+ {
+ return kuduPredicateList;
+ }
+
+ public void setKuduPredicateList(List<KuduPredicate> kuduPredicateList)
+ {
+ this.kuduPredicateList = kuduPredicateList;
+ }
+
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ public boolean isSelectStarExpressionEnabled()
+ {
+ return isSelectStarExpressionEnabled;
+ }
+
+ public void setSelectStarExpressionEnabled(boolean selectStarExpressionEnabled)
+ {
+ isSelectStarExpressionEnabled = selectStarExpressionEnabled;
+ }
+
+ public boolean isFilterExpressionEnabled()
+ {
+ return isFilterExpressionEnabled;
+ }
+
+ public void setFilterExpressionEnabled(boolean filterExpressionEnabled)
+ {
+ isFilterExpressionEnabled = filterExpressionEnabled;
+ }
+
+ public boolean isOptionsEnabled()
+ {
+ return isOptionsEnabled;
+ }
+
+ public void setOptionsEnabled(boolean optionsEnabled)
+ {
+ isOptionsEnabled = optionsEnabled;
+ }
+
+ public String getControlTupleMessage()
+ {
+ return controlTupleMessage;
+ }
+
+ public void setControlTupleMessage(String controlTupleMessage)
+ {
+ this.controlTupleMessage = controlTupleMessage;
+ }
+
+ public Long getReadSnapshotTime()
+ {
+ return readSnapshotTime;
+ }
+
+ public void setReadSnapshotTime(Long readSnapshotTime)
+ {
+ this.readSnapshotTime = readSnapshotTime;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java
new file mode 100644
index 0000000..acb221e
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/KuduSQLParser.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import org.antlr.v4.runtime.TokenStream;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionParser;
+
+/**
+ * A simple wrapper on the top of the auto-generated KuduSQL expression parser.
+ */
+public class KuduSQLParser extends KuduSQLExpressionParser
+{
+ private KuduSQLExpressionErrorListener kuduSQLExpressionErrorListener;
+
+ public KuduSQLParser(TokenStream input)
+ {
+ super(input);
+ removeErrorListeners();
+ kuduSQLExpressionErrorListener = new KuduSQLExpressionErrorListener();
+ addErrorListener(kuduSQLExpressionErrorListener);
+ }
+
+ public KuduSQLExpressionErrorListener getKuduSQLExpressionErrorListener()
+ {
+ return kuduSQLExpressionErrorListener;
+ }
+
+ public void setKuduSQLExpressionErrorListener(KuduSQLExpressionErrorListener kuduSQLExpressionErrorListener)
+ {
+ this.kuduSQLExpressionErrorListener = kuduSQLExpressionErrorListener;
+ }
+}
diff --git a/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java
new file mode 100644
index 0000000..e34f381
--- /dev/null
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslator.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.List;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.tree.ParseTree;
+import org.antlr.v4.runtime.tree.ParseTreeWalker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.sqlparser.KuduSQLExpressionLexer;
+import org.apache.kudu.ColumnSchema;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class responsible for parsing an SQL expression and return parsed Kudu Client equivalent objects.
+ * This class is not Thread safe.
+ */
+public class SQLToKuduPredicatesTranslator
+{
+ private static final transient Logger LOG = LoggerFactory.getLogger(SQLToKuduPredicatesTranslator.class);
+
+ private String sqlExpresssion = null;
+
+ private KuduSQLExpressionErrorListener errorListener = null;
+
+ private KuduSQLParser parser = null;
+
+ private KuduSQLParseTreeListener kuduSQLParseTreeListener = null;
+
+ private List<ColumnSchema> allColumnsForThisTable = null;
+
+ public SQLToKuduPredicatesTranslator(String sqlExpresssionForParsing, List<ColumnSchema> tableColumns)
+ throws Exception
+ {
+ Preconditions.checkNotNull(tableColumns,"Kudu table cannot have null or empty columns");
+ Preconditions.checkNotNull(sqlExpresssionForParsing,"Kudu SQL expression cannot be null");
+ sqlExpresssion = sqlExpresssionForParsing;
+ allColumnsForThisTable = tableColumns;
+ parseKuduExpression();
+ }
+
+ public void parseKuduExpression() throws Exception
+ {
+ KuduSQLExpressionLexer lexer = new KuduSQLExpressionLexer(CharStreams.fromString(sqlExpresssion));
+ CommonTokenStream tokens = new CommonTokenStream( lexer );
+ parser = new KuduSQLParser( tokens );
+ errorListener = parser.getKuduSQLExpressionErrorListener();
+ ParseTree parserTree = parser.kudusqlexpression();
+ if (!errorListener.isSyntaxError()) {
+ ParseTreeWalker parseTreeWalker = new ParseTreeWalker();
+ kuduSQLParseTreeListener = new KuduSQLParseTreeListener();
+ kuduSQLParseTreeListener.setColumnSchemaList(allColumnsForThisTable);
+ try {
+ parseTreeWalker.walk(kuduSQLParseTreeListener, parserTree);
+ } catch (Exception ex) {
+ LOG.error(" The supplied SQL expression could not be parsed because " + ex.getMessage(),ex);
+ errorListener.setSyntaxError(true);
+ }
+ } else {
+ LOG.error(" Syntax error present in the Kudu SQL expression. Hence not processing");
+ List<String> allRegisteredSyntaxErrors = errorListener.getListOfErrorMessages();
+ for (String syntaxErrorMessage : allRegisteredSyntaxErrors) {
+ LOG.error(" Error : " + syntaxErrorMessage + " in SQL expression \"" + sqlExpresssion + " \"");
+ }
+ }
+ }
+
+
+ public KuduSQLExpressionErrorListener getErrorListener()
+ {
+ return errorListener;
+ }
+
+ public void setErrorListener(KuduSQLExpressionErrorListener errorListener)
+ {
+ this.errorListener = errorListener;
+ }
+
+ public String getSqlExpresssion()
+ {
+ return sqlExpresssion;
+ }
+
+ public void setSqlExpresssion(String sqlExpresssion)
+ {
+ this.sqlExpresssion = sqlExpresssion;
+ }
+
+ public KuduSQLParser getParser()
+ {
+ return parser;
+ }
+
+ public void setParser(KuduSQLParser parser)
+ {
+ this.parser = parser;
+ }
+
+ public KuduSQLParseTreeListener getKuduSQLParseTreeListener()
+ {
+ return kuduSQLParseTreeListener;
+ }
+
+ public void setKuduSQLParseTreeListener(KuduSQLParseTreeListener kuduSQLParseTreeListener)
+ {
+ this.kuduSQLParseTreeListener = kuduSQLParseTreeListener;
+ }
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
similarity index 56%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
index 64b46c6..85ce16c 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/main/java/org/apache/apex/malhar/kudu/sqltranslator/package-info.java
@@ -16,17 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
-
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+/***
+ * The sqltranslator package classes are responsible for constructing Kudu predicates given a SQL expression
+ * as input. The main approach to build Kudu predicates is by implementing a tree listener that gets
+ * callbacks from the Antlr parser engine basing on the grammar annotations. There is also an Error Listener to
+ * help in marking a given SQL expression with the appropriate error markers so that they can be used in the code
+ * to identify the exact error causes.
*/
-public enum KuduMutationType
-{
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.kudu.sqltranslator;
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
-}
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java
new file mode 100644
index 0000000..2ed3e09
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/AbstractKuduInputOperatorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduInputOperatorTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testForErrorPortInCaseOfSQLError() throws Exception
+ {
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 5;
+ initOperatorState();
+ truncateTable();
+ addTestDataRows(10);
+ unitTestStepwiseScanInputOperator.getBuffer().clear();
+ unitTestStepwiseScanInputOperator.beginWindow(2L);
+ unitTestStepwiseScanInputOperator.processForQueryString("Select * from unittests where ");
+ unitTestStepwiseScanInputOperator.endWindow();
+ Thread.sleep(10000); // Sleep to allow for scans to complete
+ assertEquals(0,unitTestStepwiseScanInputOperator.getBuffer().size());
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 5;
+
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testForSendingBeginAndEndScanMarkers() throws Exception
+ {
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 1;
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ // truncate and add some data to the unit test table
+ truncateTable();
+ addTestDataRows(10); // This is per partition and there are 12 partitions
+ unitTestStepwiseScanInputOperator.getBuffer().clear();
+ unitTestStepwiseScanInputOperator.beginWindow(5);
+ unitTestStepwiseScanInputOperator.processForQueryString("SELECT * FROM unittests ");
+ Thread.sleep(10000); // Sleep to allow for scans to complete
+ int exptectedCount = (10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE) +
+ ( 2 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+ assertEquals(exptectedCount,unitTestStepwiseScanInputOperator.getBuffer().size());
+ // below only for debugging session usage and hence no asserts
+ for ( int i = 0; i < exptectedCount; i++) { // 12 partitions : 144 = 12 * 10 + 12 * 2 ( begin , end scan markers )
+ unitTestStepwiseScanInputOperator.emitTuples();
+ }
+ unitTestStepwiseScanInputOperator.endWindow();
+ // revert all of the changes
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 5;
+
+ }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java
new file mode 100644
index 0000000..7feb843
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/IncrementalStepScanInputOperatorTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class IncrementalStepScanInputOperatorTest extends KuduClientTestCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalStepScanInputOperatorTest.class);
+
+ public static final String APP_ID = "TestIncrementalScanInputOperator";
+ public static final int OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER = 1;
+
+ protected Context.OperatorContext operatorContext;
+ protected TestPortContext testPortContext;
+ protected IncrementalStepScanInputOperator<UnitTestTablePojo,InputOperatorControlTuple>
+ incrementalStepScanInputOperator;
+ protected Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions;
+
+ protected KuduPartitionScanStrategy partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+
+ protected KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+ protected int numberOfKuduInputOperatorPartitions = 5;
+
+ protected Partitioner.PartitioningContext partitioningContext;
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testInit() throws Exception
+ {
+ Attribute.AttributeMap.DefaultAttributeMap attributeMapForInputOperator =
+ new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMapForInputOperator.put(DAG.APPLICATION_ID, APP_ID);
+ operatorContext = mockOperatorContext(OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER,
+ attributeMapForInputOperator);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributesForInputOperator =
+ new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributesForInputOperator.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+ testPortContext = new TestPortContext(portAttributesForInputOperator);
+
+ incrementalStepScanInputOperator = new IncrementalStepScanInputOperator(UnitTestTablePojo.class,
+ "kuduincrementalstepscaninputoperator.properties");
+ incrementalStepScanInputOperator.setNumberOfPartitions(numberOfKuduInputOperatorPartitions);
+ incrementalStepScanInputOperator.setPartitionScanStrategy(partitonScanStrategy);
+ incrementalStepScanInputOperator.setScanOrderStrategy(scanOrderStrategy);
+ partitioningContext = new Partitioner.PartitioningContext()
+ {
+ @Override
+ public int getParallelPartitionCount()
+ {
+ return numberOfKuduInputOperatorPartitions;
+ }
+
+ @Override
+ public List<Operator.InputPort<?>> getInputPorts()
+ {
+ return null;
+ }
+ };
+ partitions = incrementalStepScanInputOperator.definePartitions(
+ new ArrayList(), partitioningContext);
+ Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForMeta = partitions.iterator();
+ IncrementalStepScanInputOperator actualOperator =
+ (IncrementalStepScanInputOperator)iteratorForMeta.next().getPartitionedInstance();
+ // Adjust the bindings as if apex has completed the partioning.The runtime of the framework does this in reality
+ incrementalStepScanInputOperator = actualOperator;
+ incrementalStepScanInputOperator.setup(operatorContext);
+ incrementalStepScanInputOperator.activate(operatorContext);
+ //rewire parent operator to enable proper unit testing method calls
+ incrementalStepScanInputOperator.getPartitioner().setPrototypeKuduInputOperator(incrementalStepScanInputOperator);
+ incrementalStepScanInputOperator.getScanner().setParentOperator(incrementalStepScanInputOperator);
+ }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java
new file mode 100644
index 0000000..b7c63f4
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduClientTestCommons.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/** Common test code for all kudu operators
+ */
+public class KuduClientTestCommons
+{
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(KuduClientTestCommons.class);
+
+
+ protected static final String tableName = "unittests";
+
+ protected static KuduClient kuduClient;
+
+ protected static KuduTable kuduTable;
+
+ protected static Schema schemaForUnitTests;
+
+ protected static String kuduMasterAddresses = "192.168.1.41:7051";
+
+ public static final int SPLIT_COUNT_FOR_INT_ROW_KEY = 5;
+
+ public static final int HASH_BUCKETS_SIZE_FOR_ALL_HASH_COL = 2;
+
+ public static final int TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE = 12;
+
+ public static boolean tableInitialized = false;
+
+ public static Object objectForLocking = new Object();
+
+ protected static Map<String,ColumnSchema> columnDefs = new HashMap<>();
+
+
+ public static void setup() throws Exception
+ {
+ kuduClient = getClientHandle();
+ if (kuduClient.tableExists(tableName)) {
+ kuduClient.deleteTable(tableName);
+ }
+ createTestTable(tableName,kuduClient);
+ kuduTable = kuduClient.openTable(tableName);
+ tableInitialized = true;
+ }
+
+
+ public static void shutdown() throws Exception
+ {
+ kuduClient.close();
+ }
+
+
+ private static KuduClient getClientHandle() throws Exception
+ {
+ KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder(kuduMasterAddresses);
+ KuduClient client = builder.build();
+ return client;
+ }
+
+ public static ApexKuduConnection.ApexKuduConnectionBuilder getConnectionConfigForTable()
+ {
+ ApexKuduConnection.ApexKuduConnectionBuilder connectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder();
+ return connectionBuilder.withAPossibleMasterHostAs(kuduMasterAddresses).withTableName(tableName);
+ }
+
+ public static Schema buildSchemaForUnitTestsTable() throws Exception
+ {
+ if (schemaForUnitTests != null) {
+ return schemaForUnitTests;
+ }
+ List<ColumnSchema> columns = new ArrayList<>();
+ ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
+ .key(true)
+ .build();
+ columns.add(intRowKeyCol);
+ columnDefs.put("introwkey",intRowKeyCol);
+ ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
+ .key(true)
+ .build();
+ columns.add(stringRowKeyCol);
+ columnDefs.put("stringrowkey",stringRowKeyCol);
+ ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
+ .key(true)
+ .build();
+ columns.add(timestampRowKey);
+ columnDefs.put("timestamprowkey",timestampRowKey);
+ ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
+ .nullable(true)
+ .build();
+ columns.add(longData);
+ columnDefs.put("longdata",longData);
+ ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
+ .nullable(true)
+ .build();
+ columns.add(stringData);
+ columnDefs.put("stringdata",stringData);
+ ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
+ .nullable(true)
+ .build();
+ columns.add(timestampdata);
+ columnDefs.put("timestampdata",timestampdata);
+ ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
+ .nullable(true)
+ .build();
+ columns.add(binarydata);
+ columnDefs.put("binarydata",binarydata);
+ ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
+ .nullable(true)
+ .build();
+ columns.add(floatdata);
+ columnDefs.put("floatdata",floatdata);
+ ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
+ .nullable(true)
+ .build();
+ columns.add(booldata);
+ columnDefs.put("booldata",booldata);
+ schemaForUnitTests = new Schema(columns);
+ return schemaForUnitTests;
+ }
+
+ public static void createTestTable(String tableName, KuduClient client) throws Exception
+ {
+ List<String> rangeKeys = new ArrayList<>();
+ rangeKeys.add("introwkey");
+ List<String> hashPartitions = new ArrayList<>();
+ hashPartitions.add("stringrowkey");
+ hashPartitions.add("timestamprowkey");
+ CreateTableOptions thisTableOptions = new CreateTableOptions()
+ .setNumReplicas(1)
+ .addHashPartitions(hashPartitions,HASH_BUCKETS_SIZE_FOR_ALL_HASH_COL)
+ .setRangePartitionColumns(rangeKeys);
+ int stepsize = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY;
+ int splitBoundary = stepsize;
+ Schema schema = buildSchemaForUnitTestsTable();
+ for ( int i = 0; i < SPLIT_COUNT_FOR_INT_ROW_KEY; i++) {
+ PartialRow splitRowBoundary = schema.newPartialRow();
+ splitRowBoundary.addInt("introwkey",splitBoundary);
+ thisTableOptions = thisTableOptions.addSplitRow(splitRowBoundary);
+ splitBoundary += stepsize;
+ }
+ try {
+ client.createTable(tableName, schema,thisTableOptions);
+ } catch (KuduException e) {
+ LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
+ throw e;
+ }
+
+ }
+
+ protected void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
+ {
+ KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
+ .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
+ KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
+ .build();
+ RowResultIterator rowResultItr = scanner.nextRows();
+ while (rowResultItr.hasNext()) {
+ RowResult thisRow = rowResultItr.next();
+ keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
+ keyInfo.setBooldata(thisRow.getBoolean("booldata"));
+ keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
+ keyInfo.setLongdata(thisRow.getLong("longdata"));
+ keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
+ keyInfo.setStringdata("stringdata");
+ break;
+ }
+ }
+
+ public ApexKuduConnection buildMockWiring(AbstractKuduInputOperator abstractKuduInputOperator,
+ int numScanTokens) throws Exception
+ {
+ ApexKuduConnection mockedConnectionHandle = PowerMockito.mock(ApexKuduConnection.class);
+ ApexKuduConnection.ApexKuduConnectionBuilder mockedConnectionHandleBuilder = PowerMockito.mock(
+ ApexKuduConnection.ApexKuduConnectionBuilder.class);
+ KuduClient mockedClient = PowerMockito.mock(KuduClient.class);
+ KuduSession mockedKuduSession = PowerMockito.mock(KuduSession.class);
+ KuduTable mockedKuduTable = PowerMockito.mock(KuduTable.class);
+ KuduScanToken.KuduScanTokenBuilder mockedScanTokenBuilder = PowerMockito.mock(
+ KuduScanToken.KuduScanTokenBuilder.class);
+ List<KuduScanToken> mockedScanTokens = new ArrayList<>();
+ int scanTokensToBuild = numScanTokens;
+ for (int i = 0; i < scanTokensToBuild; i++) {
+ mockedScanTokens.add(PowerMockito.mock(KuduScanToken.class));
+ }
+ PowerMockito.mockStatic(KryoCloneUtils.class);
+ when(KryoCloneUtils.cloneObject(abstractKuduInputOperator)).thenReturn(abstractKuduInputOperator);
+ //wire the mocks
+ when(abstractKuduInputOperator.getApexKuduConnectionInfo()).thenReturn(mockedConnectionHandleBuilder);
+ when(mockedConnectionHandle.getKuduClient()).thenReturn(mockedClient);
+ when(mockedClient.newSession()).thenReturn(mockedKuduSession);
+ when(mockedConnectionHandle.getKuduTable()).thenReturn(mockedKuduTable);
+ when(mockedConnectionHandle.getKuduSession()).thenReturn(mockedKuduSession);
+ when(mockedConnectionHandle.getBuilderForThisConnection()).thenReturn(mockedConnectionHandleBuilder);
+ when(mockedClient.openTable(tableName)).thenReturn(mockedKuduTable);
+ when(mockedConnectionHandleBuilder.build()).thenReturn(mockedConnectionHandle);
+ when(mockedKuduTable.getSchema()).thenReturn(schemaForUnitTests);
+ when(mockedClient.newScanTokenBuilder(mockedKuduTable)).thenReturn(mockedScanTokenBuilder);
+ when(mockedScanTokenBuilder.build()).thenReturn(mockedScanTokens);
+ return mockedConnectionHandle;
+ }
+
+ public static String getKuduMasterAddresses()
+ {
+ return kuduMasterAddresses;
+ }
+
+ public static void setKuduMasterAddresses(String kuduMasterAddresses)
+ {
+ KuduClientTestCommons.kuduMasterAddresses = kuduMasterAddresses;
+ }
+}
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
similarity index 65%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
index 615427a..b4ab141 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java
@@ -16,82 +16,47 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.junit.AfterClass;
+
import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduPredicate;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
+
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.TestPortContext;
+
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.junit.Assert.assertEquals;
-
-public class KuduCreateUpdateDeleteOutputOperatorTest
+public class KuduCreateUpdateDeleteOutputOperatorTest extends KuduClientTestCommons
{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
private static final transient Logger LOG = LoggerFactory.getLogger(KuduCreateUpdateDeleteOutputOperatorTest.class);
- private static final String tableName = "unittests";
-
private final String APP_ID = "TestKuduOutputOperator";
private final int OPERATOR_ID_FOR_KUDU_CRUD = 0;
- private static KuduClient kuduClient;
-
- private static KuduTable kuduTable;
-
- private static Map<String,ColumnSchema> columnDefs = new HashMap<>();
-
private BaseKuduOutputOperator simpleKuduOutputOperator;
private OperatorContext contextForKuduOutputOperator;
private TestPortContext testPortContextForKuduOutput;
- @BeforeClass
- public static void setup() throws Exception
- {
- kuduClient = getClientHandle();
- if (kuduClient.tableExists(tableName)) {
- kuduClient.deleteTable(tableName);
- }
- createTestTable(tableName,kuduClient);
- kuduTable = kuduClient.openTable(tableName);
- }
-
- @AfterClass
- public static void shutdown() throws Exception
- {
- kuduClient.close();
- }
-
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
@Before
public void setUpKuduOutputOperatorContext() throws Exception
{
@@ -107,96 +72,8 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
simpleKuduOutputOperator.input.setup(testPortContextForKuduOutput);
}
- private static KuduClient getClientHandle() throws Exception
- {
- KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("localhost:7051");
- KuduClient client = builder.build();
- return client;
- }
-
- private static void createTestTable(String tableName, KuduClient client) throws Exception
- {
- List<ColumnSchema> columns = new ArrayList<>();
- ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32)
- .key(true)
- .build();
- columns.add(intRowKeyCol);
- columnDefs.put("introwkey",intRowKeyCol);
- ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING)
- .key(true)
- .build();
- columns.add(stringRowKeyCol);
- columnDefs.put("stringrowkey",stringRowKeyCol);
- ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS)
- .key(true)
- .build();
- columns.add(timestampRowKey);
- columnDefs.put("timestamprowkey",timestampRowKey);
- ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64)
- .build();
- columns.add(longData);
- columnDefs.put("longdata",longData);
- ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING)
- .build();
- columns.add(stringData);
- columnDefs.put("stringdata",stringData);
- ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS)
- .build();
- columns.add(timestampdata);
- columnDefs.put("timestampdata",timestampdata);
- ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY)
- .build();
- columns.add(binarydata);
- columnDefs.put("binarydata",binarydata);
- ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT)
- .build();
- columns.add(floatdata);
- columnDefs.put("floatdata",floatdata);
- ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL)
- .build();
- columns.add(booldata);
- columnDefs.put("booldata",booldata);
- List<String> rangeKeys = new ArrayList<>();
- rangeKeys.add("stringrowkey");
- rangeKeys.add("timestamprowkey");
- List<String> hashPartitions = new ArrayList<>();
- hashPartitions.add("introwkey");
- Schema schema = new Schema(columns);
- try {
- client.createTable(tableName, schema,
- new CreateTableOptions()
- .setNumReplicas(1)
- .setRangePartitionColumns(rangeKeys)
- .addHashPartitions(hashPartitions,2));
- } catch (KuduException e) {
- LOG.error("Error while creating table for unit tests " + e.getMessage(), e);
- throw e;
- }
- }
-
- private void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception
- {
- KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
- .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"),
- KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey()))
- .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"),
- KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey()))
- .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"),
- KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey()))
- .build();
- RowResultIterator rowResultItr = scanner.nextRows();
- while (rowResultItr.hasNext()) {
- RowResult thisRow = rowResultItr.next();
- keyInfo.setFloatdata(thisRow.getFloat("floatdata"));
- keyInfo.setBooldata(thisRow.getBoolean("booldata"));
- keyInfo.setBinarydata(thisRow.getBinary("binarydata"));
- keyInfo.setLongdata(thisRow.getLong("longdata"));
- keyInfo.setTimestampdata(thisRow.getLong("timestampdata"));
- keyInfo.setStringdata("stringdata");
- break;
- }
- }
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void processForUpdate() throws Exception
{
@@ -235,6 +112,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
assertEquals(unitTestTablePojoRead.isBooldata(), false);
}
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void processForUpsert() throws Exception
{
@@ -267,6 +145,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
assertEquals(unitTestTablePojoRead.isBooldata(), true);
}
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void processForDelete() throws Exception
{
@@ -304,6 +183,7 @@ public class KuduCreateUpdateDeleteOutputOperatorTest
assertEquals(unitTestTablePojoRead.getBinarydata(), null);
}
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
@Test
public void processForInsert() throws Exception
{
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java
new file mode 100644
index 0000000..19a815c
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/KuduInputOperatorCommons.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.scanner.AbstractKuduPartitionScanner;
+import org.apache.apex.malhar.kudu.scanner.KuduScanOrderStrategy;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.client.Upsert;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+
+public class KuduInputOperatorCommons extends KuduClientTestCommons
+{
+ private static final Logger LOG = LoggerFactory.getLogger(KuduInputOperatorCommons.class);
+
+ public static final String APP_ID = "TestKuduInputOperatorOneToOnePartitioner";
+ public static final int OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER = 1;
+
+ protected Context.OperatorContext operatorContext;
+ protected TestPortContext testPortContext;
+ protected UnitTestStepwiseScanInputOperator unitTestStepwiseScanInputOperator;
+ protected Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitions;
+
+ protected KuduPartitionScanStrategy partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+
+ protected KuduScanOrderStrategy scanOrderStrategy = KuduScanOrderStrategy.RANDOM_ORDER_SCANNER;
+
+ protected static int numberOfKuduInputOperatorPartitions = 5;
+
+ protected static Partitioner.PartitioningContext partitioningContext;
+
+ public void initOperatorState() throws Exception
+ {
+ Attribute.AttributeMap.DefaultAttributeMap attributeMapForInputOperator =
+ new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMapForInputOperator.put(DAG.APPLICATION_ID, APP_ID);
+ operatorContext = mockOperatorContext(OPERATOR_ID_FOR_ONE_TO_ONE_PARTITIONER,
+ attributeMapForInputOperator);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributesForInputOperator =
+ new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributesForInputOperator.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class);
+ testPortContext = new TestPortContext(portAttributesForInputOperator);
+
+ unitTestStepwiseScanInputOperator = new UnitTestStepwiseScanInputOperator(
+ getConnectionConfigForTable(), UnitTestTablePojo.class);
+ unitTestStepwiseScanInputOperator.setNumberOfPartitions(numberOfKuduInputOperatorPartitions);
+ unitTestStepwiseScanInputOperator.setPartitionScanStrategy(partitonScanStrategy);
+ unitTestStepwiseScanInputOperator.setScanOrderStrategy(scanOrderStrategy);
+ initCommonConfigsForAllTypesOfTests();
+ partitions = unitTestStepwiseScanInputOperator.definePartitions(
+ new ArrayList(), partitioningContext);
+ Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForMeta = partitions.iterator();
+ UnitTestStepwiseScanInputOperator actualOperator =
+ (UnitTestStepwiseScanInputOperator)iteratorForMeta.next()
+ .getPartitionedInstance();
+ // Adjust the bindings as if apex has completed the partioning.The runtime of the framework does this in reality
+ unitTestStepwiseScanInputOperator = actualOperator;
+ unitTestStepwiseScanInputOperator.setup(operatorContext);
+ unitTestStepwiseScanInputOperator.activate(operatorContext);
+ //rewire parent operator to enable proper unit testing method calls
+ unitTestStepwiseScanInputOperator.getPartitioner().setPrototypeKuduInputOperator(unitTestStepwiseScanInputOperator);
+ unitTestStepwiseScanInputOperator.getScanner().setParentOperator(unitTestStepwiseScanInputOperator);
+ }
+
+ public static void initCommonConfigsForAllTypesOfTests() throws Exception
+ {
+ KuduClientTestCommons.buildSchemaForUnitTestsTable();
+ partitioningContext = new Partitioner.PartitioningContext()
+ {
+ @Override
+ public int getParallelPartitionCount()
+ {
+ return numberOfKuduInputOperatorPartitions;
+ }
+
+ @Override
+ public List<Operator.InputPort<?>> getInputPorts()
+ {
+ return null;
+ }
+ };
+ }
+
+ public void truncateTable() throws Exception
+ {
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForDeletingRows =
+ unitTestStepwiseScanInputOperator.getScanner();
+ List<KuduScanToken> scansForAllTablets = unitTestStepwiseScanInputOperator
+ .getPartitioner().getKuduScanTokensForSelectAllColumns();
+ ApexKuduConnection aCurrentConnection = scannerForDeletingRows.getConnectionPoolForThreads().get(0);
+ KuduSession aSessionForDeletes = aCurrentConnection.getKuduClient().newSession();
+ KuduTable currentTable = aCurrentConnection.getKuduTable();
+ for ( KuduScanToken aTabletScanToken : scansForAllTablets) {
+ KuduScanner aScanner = aTabletScanToken.intoScanner(aCurrentConnection.getKuduClient());
+ while ( aScanner.hasMoreRows()) {
+ RowResultIterator itrForRows = aScanner.nextRows();
+ while ( itrForRows.hasNext()) {
+ RowResult aRow = itrForRows.next();
+ int intRowKey = aRow.getInt("introwkey");
+ String stringRowKey = aRow.getString("stringrowkey");
+ long timestampRowKey = aRow.getLong("timestamprowkey");
+ Delete aDeleteOp = currentTable.newDelete();
+ aDeleteOp.getRow().addInt("introwkey",intRowKey);
+ aDeleteOp.getRow().addString("stringrowkey", stringRowKey);
+ aDeleteOp.getRow().addLong("timestamprowkey",timestampRowKey);
+ aSessionForDeletes.apply(aDeleteOp);
+ }
+ }
+ }
+ aSessionForDeletes.close();
+ Thread.sleep(2000); // Sleep to allow for scans to complete
+ }
+
+ public void addTestDataRows(int numRowsInEachPartition) throws Exception
+ {
+ int intRowKeyStepsize = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY;
+ int splitBoundaryForIntRowKey = intRowKeyStepsize;
+ int[] inputrowkeyPartitionEntries = new int[SPLIT_COUNT_FOR_INT_ROW_KEY + 1];
+ // setting the int keys that will fall in the range of all partitions
+ for ( int i = 0; i < SPLIT_COUNT_FOR_INT_ROW_KEY; i++) {
+ inputrowkeyPartitionEntries[i] = splitBoundaryForIntRowKey + 3; // 3 to fall into the partition next to boundary
+ splitBoundaryForIntRowKey += intRowKeyStepsize;
+ }
+ inputrowkeyPartitionEntries[SPLIT_COUNT_FOR_INT_ROW_KEY] = splitBoundaryForIntRowKey + 3;
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> scannerForAddingRows =
+ unitTestStepwiseScanInputOperator.getScanner();
+ ApexKuduConnection aCurrentConnection = scannerForAddingRows.getConnectionPoolForThreads().get(0);
+ KuduSession aSessionForInserts = aCurrentConnection.getKuduClient().newSession();
+ KuduTable currentTable = aCurrentConnection.getKuduTable();
+ long seedValueForTimestampRowKey = 0L; // constant to allow for data landing on first partition for unit tests
+ for ( int i = 0; i <= SPLIT_COUNT_FOR_INT_ROW_KEY; i++) { // range key iterator
+ int intRowKeyBaseValue = inputrowkeyPartitionEntries[i] + i;
+ for ( int k = 0; k < 2; k++) { // hash key iterator . The table defines two hash partitions
+ long timestampRowKeyValue = seedValueForTimestampRowKey + k; // to avoid spilling to another tablet
+ String stringRowKeyValue = "" + timestampRowKeyValue + k; // to avoid spilling to another tablet randomly
+ for ( int y = 0; y < numRowsInEachPartition; y++) {
+ Upsert aNewRow = currentTable.newUpsert();
+ PartialRow rowValue = aNewRow.getRow();
+ // Start assigning row keys below the current split boundary.
+ rowValue.addInt("introwkey",intRowKeyBaseValue - y - 1);
+ rowValue.addString("stringrowkey",stringRowKeyValue);
+ rowValue.addLong("timestamprowkey",timestampRowKeyValue);
+ rowValue.addLong("longdata",(seedValueForTimestampRowKey + y));
+ rowValue.addString("stringdata", ("" + seedValueForTimestampRowKey + y));
+ OperationResponse response = aSessionForInserts.apply(aNewRow);
+ }
+ }
+ }
+ List<OperationResponse> insertResponse = aSessionForInserts.flush();
+ aSessionForInserts.close();
+ Thread.sleep(2000); // Sleep to allow for scans to complete
+ }
+
+ public long countNumRowsInTable() throws Exception
+ {
+ List<String> allProjectedCols = new ArrayList<>(
+ unitTestStepwiseScanInputOperator.getKuduColNameToSchemaMapping().keySet());
+ KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable)
+ .setProjectedColumnNames(allProjectedCols)
+ .build();
+ long counter = 0;
+ while (scanner.hasMoreRows()) {
+ RowResultIterator rowResultItr = scanner.nextRows();
+ while (rowResultItr.hasNext()) {
+ RowResult thisRow = rowResultItr.next();
+ counter += 1;
+ }
+ }
+ return counter;
+ }
+}
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
similarity index 81%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
index 3933df7..81e93b7 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/SimpleKuduOutputOperator.java
@@ -16,19 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
+import org.junit.Rule;
+
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
import org.apache.kudu.client.ExternalConsistencyMode;
import org.apache.kudu.client.SessionConfiguration;
public class SimpleKuduOutputOperator extends AbstractKuduOutputOperator
{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
@Override
ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig()
{
return new ApexKuduConnection.ApexKuduConnectionBuilder()
- .withAPossibleMasterHostAs("localhost:7051")
- .withTableName("unittests")
+ .withAPossibleMasterHostAs(KuduClientTestCommons.kuduMasterAddresses)
+ .withTableName(KuduClientTestCommons.tableName)
.withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT)
.withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
.withNumberOfBossThreads(1)
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
similarity index 53%
copy from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
copy to kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
index 64b46c6..5f98411 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestStepwiseScanInputOperator.java
@@ -16,17 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
-/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
- */
-public enum KuduMutationType
+
+public class UnitTestStepwiseScanInputOperator extends AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple>
{
+ private int start = 0;
+
+ private int step = 2;
+
+ public UnitTestStepwiseScanInputOperator(ApexKuduConnection.ApexKuduConnectionBuilder kuduConnectionInfo,
+ Class<UnitTestTablePojo> clazzForPOJO) throws Exception
+ {
+ super(kuduConnectionInfo, clazzForPOJO);
+ }
+
+ public UnitTestStepwiseScanInputOperator()
+ {
+ }
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
+ @Override
+ protected String getNextQuery()
+ {
+ return "select timestamprowkey,longdata,stringdata from unittests where introwkey >= " + start +
+ "and introwkey <= " + (start++ + step);
+ }
}
diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
similarity index 83%
rename from contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
index dc2cc33..da69ba1 100644
--- a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/UnitTestTablePojo.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu;
import java.nio.ByteBuffer;
@@ -122,4 +122,20 @@ public class UnitTestTablePojo
{
this.booldata = booldata;
}
+
+ @Override
+ public String toString()
+ {
+ return "UnitTestTablePojo{" +
+ "introwkey=" + introwkey +
+ ", stringrowkey='" + stringrowkey + '\'' +
+ ", timestamprowkey=" + timestamprowkey +
+ ", longdata=" + longdata +
+ ", stringdata='" + stringdata + '\'' +
+ ", timestampdata=" + timestampdata +
+ ", binarydata=" + binarydata +
+ ", floatdata=" + floatdata +
+ ", booldata=" + booldata +
+ '}';
+ }
}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java
new file mode 100644
index 0000000..6beea0b
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/AbstractKuduInputPartitionerTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.client.KuduScanToken;
+
+import com.datatorrent.api.Partitioner;
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduInputPartitionerTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testKuduSelectAllScanTokens() throws Exception
+ {
+ initOperatorState();
+ AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+ List<KuduScanToken> allScanTokens = partitioner.getKuduScanTokensForSelectAllColumns();
+ assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,allScanTokens.size());
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testNumberOfPartitions() throws Exception
+ {
+ numberOfKuduInputOperatorPartitions = -1; // situation when the user has not set the partitions explicitly
+ initOperatorState();
+ AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+ assertEquals(1,partitioner.getNumberOfPartitions(partitioningContext));
+ numberOfKuduInputOperatorPartitions = 7; // situation when the user has not set the partitions explicitly
+ initOperatorState();
+ partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+ assertEquals(7,partitioner.getNumberOfPartitions(partitioningContext));
+ numberOfKuduInputOperatorPartitions = -1; // revert the value back to default
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testDefinePartitions() throws Exception
+ {
+ // case of setting num partitions to 7 but setting one to one mapping.
+ numberOfKuduInputOperatorPartitions = 7;
+ initOperatorState();
+ AbstractKuduInputPartitioner partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+ assertEquals(KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE,partitioner.definePartitions(
+ new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),partitioningContext).size());
+ // case of setting num partition to 7 but go with many to one mapping
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ initOperatorState();
+ partitioner = unitTestStepwiseScanInputOperator.getPartitioner();
+ Collection<Partitioner.Partition<AbstractKuduInputOperator>> partitionsCalculated = partitioner.definePartitions(
+ new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),partitioningContext);
+ assertEquals(7,partitionsCalculated.size());
+ int maxPartitionPerOperator = -1;
+ int minPartitionPerOperator = Integer.MAX_VALUE;
+ Iterator<Partitioner.Partition<AbstractKuduInputOperator>> iteratorForPartitionScan =
+ partitionsCalculated.iterator();
+ while ( iteratorForPartitionScan.hasNext()) {
+ Partitioner.Partition<AbstractKuduInputOperator> anOperatorPartition = iteratorForPartitionScan.next();
+ AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> anOperatorHandle =
+ anOperatorPartition.getPartitionedInstance();
+ List<KuduPartitionScanAssignmentMeta> partitionsAssigned = anOperatorHandle.getPartitionPieAssignment();
+ if ( partitionsAssigned.size() > maxPartitionPerOperator ) {
+ maxPartitionPerOperator = partitionsAssigned.size();
+ }
+ if ( partitionsAssigned.size() < minPartitionPerOperator) {
+ minPartitionPerOperator = partitionsAssigned.size();
+ }
+ }
+ assertEquals(2,maxPartitionPerOperator); // 7 kudu operator instances dealing with 12 kudu tablets
+ assertEquals(1,minPartitionPerOperator); // 7 kudu operator instances dealing with 12 kudu tablets
+ // revert all the changes to defaults for the next test
+ numberOfKuduInputOperatorPartitions = 5;
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+ }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java
new file mode 100644
index 0000000..97768d2
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToManyPartitionerTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.junit.MatcherAssert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KryoCloneUtils.class})
+public class KuduOneToManyPartitionerTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testAssignPartitions() throws Exception
+ {
+ AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> mockedInputOperator =
+ PowerMockito.mock(AbstractKuduInputOperator.class);
+ when(mockedInputOperator.getNumberOfPartitions()).thenReturn(5);
+ PowerMockito.mockStatic(KryoCloneUtils.class);
+ when(KryoCloneUtils.cloneObject(mockedInputOperator)).thenReturn(mockedInputOperator);
+ KuduOneToManyPartitioner kuduOneToManyPartitioner = new KuduOneToManyPartitioner(mockedInputOperator);
+ buildMockWiring(mockedInputOperator, KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+ kuduOneToManyPartitioner.setPrototypeKuduInputOperator(mockedInputOperator);
+ Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignedPartitions = kuduOneToManyPartitioner.assign(
+ kuduOneToManyPartitioner.getListOfPartitionAssignments(
+ new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),
+ partitioningContext),partitioningContext);
+ assertThat(assignedPartitions.size(), is(5));
+ for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+ assertThat(eachOperatorassignment.size(), lessThanOrEqualTo(3));
+ }
+ for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+ assertThat(eachOperatorassignment.size(), greaterThanOrEqualTo(2));
+ }
+ }
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java
new file mode 100644
index 0000000..7d0d417
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/partitioner/KuduOneToOnePartitionerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.AbstractKuduInputOperator;
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.scanner.KuduPartitionScanAssignmentMeta;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.junit.MatcherAssert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KryoCloneUtils.class})
+public class KuduOneToOnePartitionerTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testAssignPartitions() throws Exception
+ {
+ AbstractKuduInputOperator<UnitTestTablePojo,InputOperatorControlTuple> mockedInputOperator =
+ PowerMockito.mock(AbstractKuduInputOperator.class);
+ when(mockedInputOperator.getNumberOfPartitions()).thenReturn(9);
+ PowerMockito.mockStatic(KryoCloneUtils.class);
+ when(KryoCloneUtils.cloneObject(mockedInputOperator)).thenReturn(mockedInputOperator);
+ KuduOneToOnePartitioner kuduOneToOnePartitioner = new KuduOneToOnePartitioner(mockedInputOperator);
+ buildMockWiring(mockedInputOperator, KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+ kuduOneToOnePartitioner.setPrototypeKuduInputOperator(mockedInputOperator);
+ Map<Integer,List<KuduPartitionScanAssignmentMeta>> assignedPartitions = kuduOneToOnePartitioner.assign(
+ kuduOneToOnePartitioner.getListOfPartitionAssignments(
+ new ArrayList<Partitioner.Partition<AbstractKuduInputOperator>>(),
+ partitioningContext),partitioningContext);
+ assertThat(assignedPartitions.size(), is(12));
+ for (List<KuduPartitionScanAssignmentMeta> eachOperatorassignment: assignedPartitions.values()) {
+ assertThat(eachOperatorassignment.size(), is(1));
+ }
+ }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java
new file mode 100644
index 0000000..0e6fcd3
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/AbstractKuduPartitionScannerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractKuduPartitionScannerTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void textPrepaprePlanForScanners() throws Exception
+ {
+ // no predicates test
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo, InputOperatorControlTuple> scanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select introwkey as intColumn from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ List<KuduPartitionScanAssignmentMeta> scansForThisQuery = scanner.preparePlanForScanners(translator);
+ // No predicates and hence a full scan and 1-1 implies one partition for this attempt
+ assertEquals(1,scansForThisQuery.size());
+ // many to one partitioner and no predicates
+ numberOfKuduInputOperatorPartitions = 4;
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ initOperatorState();
+ translator = new SQLToKuduPredicatesTranslator(
+ "select introwkey as intColumn from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ scanner = unitTestStepwiseScanInputOperator.getScanner();
+ scansForThisQuery = scanner.preparePlanForScanners(translator);
+ assertEquals(3,scansForThisQuery.size()); // 12 kudu partitions over 4 Apex partitions = 3
+ // many to one partitioner and no predicates
+ numberOfKuduInputOperatorPartitions = 4;
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ initOperatorState();
+ translator = new SQLToKuduPredicatesTranslator(
+ "select introwkey as intColumn from unittests where introwkey = 1",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ scanner = unitTestStepwiseScanInputOperator.getScanner();
+ scansForThisQuery = scanner.preparePlanForScanners(translator);
+ // This query will actually result in two tablet scans as there is a hash partition involved as well apart from
+ // range partitioning. However this operator is getting one scan as part of the pie assignment. Hence assert 1
+ assertEquals(1,scansForThisQuery.size());
+
+ //revert all changes back for subsequent tests
+ numberOfKuduInputOperatorPartitions = 5;
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testForverifyConnectionStaleness() throws Exception
+ {
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo, InputOperatorControlTuple> scanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ scanner.connectionPoolForThreads.get(0).close(); // forcefully close
+ assertEquals(true,scanner.connectionPoolForThreads.get(0).getKuduSession().isClosed());
+ scanner.verifyConnectionStaleness(0);
+ assertEquals(false,scanner.connectionPoolForThreads.get(0).getKuduSession().isClosed());
+ }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java
new file mode 100644
index 0000000..8d140c6
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/scanner/KuduPartitionScannerCallableTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.scanner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kudu.InputOperatorControlTuple;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+import org.apache.apex.malhar.kudu.UnitTestTablePojo;
+import org.apache.apex.malhar.kudu.partitioner.KuduPartitionScanStrategy;
+import org.apache.apex.malhar.kudu.sqltranslator.SQLToKuduPredicatesTranslator;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduPartitionScannerCallableTest extends KuduInputOperatorCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testSettersForPojo() throws Exception
+ {
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select introwkey as intColumn from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+ KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+ KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,scansForThisQuery.get(0),
+ currentScanner.getConnectionPoolForThreads().get(0),
+ unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+ long countOfScan = threadToScan.call();
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testRowScansForAllDataAcrossAllPartitions() throws Exception
+ {
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 1;
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ // truncate and add some data to the unit test table
+ truncateTable();
+ addTestDataRows(10); // This is per partition and there are 12 partitions
+ assertEquals((KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE * 10 ),countNumRowsInTable());
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select * from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+ // Now scan for exact match of counts
+ long totalRowsRead = 0;
+ unitTestStepwiseScanInputOperator.getBuffer().clear();
+ for (KuduPartitionScanAssignmentMeta aSegmentToScan : scansForThisQuery) {
+ KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+ KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,aSegmentToScan,
+ currentScanner.getConnectionPoolForThreads().get(0),
+ unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+ totalRowsRead += threadToScan.call();
+ }
+ // 144 = 120 records + 12 * 2 markers
+ int expectedCount = ( 10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE) +
+ ( 2 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE);
+ assertEquals(expectedCount,unitTestStepwiseScanInputOperator.getBuffer().size());
+ // revert all configs to default
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 5;
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = true)
+ @Test
+ public void testRowScansForAllDataInSinglePartition() throws Exception
+ {
+ partitonScanStrategy = KuduPartitionScanStrategy.MANY_TABLETS_PER_OPERATOR;
+ initOperatorState();
+ AbstractKuduPartitionScanner<UnitTestTablePojo,InputOperatorControlTuple> currentScanner =
+ unitTestStepwiseScanInputOperator.getScanner();
+ // truncate and add some data to the unit test table
+ truncateTable();
+ addTestDataRows(10); // This is per partition and there are 12 partitions
+ assertEquals((10 * KuduClientTestCommons.TOTAL_KUDU_TABLETS_FOR_UNITTEST_TABLE),countNumRowsInTable());
+ int intRowBoundary = Integer.MAX_VALUE / SPLIT_COUNT_FOR_INT_ROW_KEY; // 5 to allow ofr scan to fall in lower
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select * from unittests where introwkey < " + intRowBoundary,
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ List<KuduPartitionScanAssignmentMeta> scansForThisQuery = currentScanner.preparePlanForScanners(translator);
+ // Now scan for exact match of counts
+ long totalRowsRead = 0;
+ unitTestStepwiseScanInputOperator.getBuffer().clear();
+ for (KuduPartitionScanAssignmentMeta aSegmentToScan : scansForThisQuery) {
+ KuduPartitionScannerCallable<UnitTestTablePojo,InputOperatorControlTuple> threadToScan = new
+ KuduPartitionScannerCallable<>(unitTestStepwiseScanInputOperator,aSegmentToScan,
+ currentScanner.getConnectionPoolForThreads().get(0),
+ unitTestStepwiseScanInputOperator.extractSettersForResultObject(translator),translator);
+ totalRowsRead += threadToScan.call();
+ }
+ // 23 because of the hash distributions and the scan markers. 21 are data records and 2 are end of scan markers
+ assertEquals(23L,unitTestStepwiseScanInputOperator.getBuffer().size());
+ // revert all configs to default
+ partitonScanStrategy = KuduPartitionScanStrategy.ONE_TABLET_PER_OPERATOR;
+ numberOfKuduInputOperatorPartitions = 5;
+ }
+
+
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java
new file mode 100644
index 0000000..59936fa
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/sqltranslator/SQLToKuduPredicatesTranslatorTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.sqltranslator;
+
+import java.util.ArrayList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.test.KuduClusterAvailabilityTestRule;
+import org.apache.apex.malhar.kudu.test.KuduClusterTestContext;
+import org.apache.kudu.ColumnSchema;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class SQLToKuduPredicatesTranslatorTest extends KuduClientTestCommons
+{
+ @Rule
+ public KuduClusterAvailabilityTestRule kuduClusterAvailabilityTestRule = new KuduClusterAvailabilityTestRule();
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testForCompletenessOfSQLExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select from unittests",new ArrayList<ColumnSchema>(columnDefs.values()));
+ translator.parseKuduExpression();
+ KuduSQLExpressionErrorListener errorListener = translator.getErrorListener();
+ assertEquals(true,errorListener.isSyntaxError());
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testForErrorsInColumnAliasesInSQLExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ "select intkey as intColumn from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ KuduSQLExpressionErrorListener errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey as intColumn, 'from' as fgh from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey, 'from' from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey, 'from' as fgh from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testForErrorsInOptionsInSQLExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = null;
+ KuduSQLExpressionErrorListener errorListener = null;
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey as intColumn from unittests using Options",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(true,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey as intColumn, 'from' as fgh from " +
+ " unittests using options READ_SNAPSHOT_TIME = aASDAD",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(true,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey, 'from' from unittests using options READ_SNAPSHOT_TIME = 2342345",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey, xcv as fgh from unittests using options CONTROLTUPLE_MESSAGE = \"done\"",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ errorListener = translator.getErrorListener();
+ assertEquals(false,errorListener.isSyntaxError());
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testForSelectStarInSQLExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ " select * from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ assertEquals(true,translator.getKuduSQLParseTreeListener().isSelectStarExpressionEnabled());
+
+ translator = new SQLToKuduPredicatesTranslator(
+ "select intkey as intColumn from unittests ",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ assertEquals(false,translator.getKuduSQLParseTreeListener().isSelectStarExpressionEnabled());
+
+ }
+
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ @Test
+ public void testForColumnNameExtractionInSQLExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ " select introwkey as intColumn, ' from' as flColumn, stringCol from unittests",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ assertEquals(1, translator.getKuduSQLParseTreeListener().getListOfColumnsUsed().size());
+ assertEquals(9, translator.getKuduSQLParseTreeListener().getAliases().size());
+ assertEquals("intColumn", translator.getKuduSQLParseTreeListener().getAliases().get("introwkey"));
+ }
+
+ @Test
+ @KuduClusterTestContext(kuduClusterBasedTest = false)
+ public void testForReadSnapshotTimeExpression() throws Exception
+ {
+ SQLToKuduPredicatesTranslator translator = new SQLToKuduPredicatesTranslator(
+ " select introwkey as intColumn using options read_snapshot_time = 12345",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ assertEquals(12345L, translator.getKuduSQLParseTreeListener().getReadSnapshotTime().longValue());
+ SQLToKuduPredicatesTranslator translatorForNoReadSnapshotTime = new SQLToKuduPredicatesTranslator(
+ " select introwkey as intColumn",
+ new ArrayList<ColumnSchema>(columnDefs.values()));
+ assertEquals(null, translatorForNoReadSnapshotTime.getKuduSQLParseTreeListener().getReadSnapshotTime());
+
+ }
+}
diff --git a/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java
new file mode 100644
index 0000000..1b771ab
--- /dev/null
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterAvailabilityTestRule.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kudu.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.kudu.BaseKuduOutputOperator;
+import org.apache.apex.malhar.kudu.IncrementalStepScanInputOperator;
+import org.apache.apex.malhar.kudu.KuduClientTestCommons;
+import org.apache.apex.malhar.kudu.KuduInputOperatorCommons;
+
+/**
+ * A Junit rule that helps in bypassing tests that cannot be done if the kudu cluster is not present.
+ */
+public class KuduClusterAvailabilityTestRule implements TestRule
+{
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(KuduClusterAvailabilityTestRule.class);
+
+ public static final String KUDU_MASTER_HOSTS_STRING_ENV = "kudumasterhosts";
+ @Override
+ public Statement apply(Statement base, Description description)
+ {
+ KuduClusterTestContext testContext = description.getAnnotation(KuduClusterTestContext.class);
+ String masterHostsValue = System.getProperty(KUDU_MASTER_HOSTS_STRING_ENV);
+ boolean runThisTest = true; // default is to run the test if no annotation is specified
+ if ( testContext != null) {
+ if ((masterHostsValue != null) && (testContext.kuduClusterBasedTest())) {
+ runThisTest = true;
+ }
+ if ((masterHostsValue != null) && (!testContext.kuduClusterBasedTest())) {
+ runThisTest = false;
+ }
+ if ((masterHostsValue == null) && (testContext.kuduClusterBasedTest())) {
+ runThisTest = false;
+ }
+ if ((masterHostsValue == null) && (!testContext.kuduClusterBasedTest())) {
+ runThisTest = true;
+ }
+ }
+ if (runThisTest) {
+ // call the before class equivalent for real kudu cluster
+ if ((testContext != null ) && (testContext.kuduClusterBasedTest())) {
+ try {
+ initRealKuduClusterSetup(masterHostsValue);
+ } catch (Exception e) {
+ throw new RuntimeException("Not able to initialize kudu cluster",e);
+ }
+ } else {
+ // call the before class equivalent for mocked kudu cluster
+ try {
+ KuduInputOperatorCommons.initCommonConfigsForAllTypesOfTests();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not initialize commn configs required for Kudu cluster connectivity",e);
+ }
+ }
+ // Run the original test
+ return base;
+ } else {
+ // bypass the test altogether
+ return new Statement()
+ {
+
+ @Override
+ public void evaluate() throws Throwable
+ {
+ // Return an empty Statement object for those tests
+ }
+ };
+ }
+
+ }
+
+ private void initRealKuduClusterSetup(String masterHosts) throws Exception
+ {
+ if (!KuduClientTestCommons.tableInitialized) {
+ synchronized (KuduClientTestCommons.objectForLocking) { // test rigs can be parallelized
+ if (!KuduClientTestCommons.tableInitialized) {
+ KuduClientTestCommons.setKuduMasterAddresses(masterHosts);
+ KuduClientTestCommons.setup();
+ initPropertiesFilesForBaseKuduOutputOperator(masterHosts);
+ initPropertiesFilesForIncrementalStepScanInputOperator(masterHosts);
+ KuduClientTestCommons.tableInitialized = true;
+ }
+ }
+ }
+ }
+
+ private void initPropertiesFilesForBaseKuduOutputOperator(String masterHosts)
+ {
+ BaseKuduOutputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME = rewritePropsFile(
+ masterHosts,BaseKuduOutputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME);
+ }
+
+ private void initPropertiesFilesForIncrementalStepScanInputOperator(String masterHosts)
+ {
+ IncrementalStepScanInputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME = rewritePropsFile(
+ masterHosts,IncrementalStepScanInputOperator.DEFAULT_CONNECTION_PROPS_FILE_NAME);
+ }
+
+ /**
+ * Creates a temporary properties file that sets the kudu cluster master hosts value as given in the command line.
+ * @param masterHosts
+ * @param originalFileName File name as would be picked by the class loader
+ * @return
+ */
+ private String rewritePropsFile(String masterHosts, String originalFileName)
+ {
+ Properties kuduConnectionProperties = new Properties();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream kuduPropsFileAsStream = loader.getResourceAsStream(originalFileName);
+ File tempPropertiesFile = null;
+ try {
+ tempPropertiesFile = File.createTempFile("kuduclusterconfig-" + System.currentTimeMillis(),
+ ".properties");
+ LOG.info(tempPropertiesFile.getAbsolutePath() + " is being used as a temporary props file");
+ } catch (IOException e) {
+ LOG.error("Not able to create temp file",e);
+ return originalFileName;
+ }
+ if (kuduPropsFileAsStream != null) {
+ try {
+ kuduConnectionProperties.load(kuduPropsFileAsStream);
+ kuduPropsFileAsStream.close();
+ FileOutputStream toBeModifiedFileHandle = new FileOutputStream(tempPropertiesFile);
+ kuduConnectionProperties.put(BaseKuduOutputOperator.MASTER_HOSTS,masterHosts);
+ kuduConnectionProperties.store(toBeModifiedFileHandle,"updated by unit test");
+ toBeModifiedFileHandle.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ return tempPropertiesFile.getAbsolutePath();
+ }
+
+}
diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
similarity index 62%
rename from contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
rename to kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
index 64b46c6..80ca0af 100644
--- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java
+++ b/kudu/src/test/java/org/apache/apex/malhar/kudu/test/KuduClusterTestContext.java
@@ -16,17 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.malhar.contrib.kudu;
+package org.apache.apex.malhar.kudu.test;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
/**
- * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the
- * mutation being represented by the current tuple</p>
+ * Annotation that helps in selectively triggering certain tests if Kudu cluster is available at the time of
+ * launching of the tests
*/
-public enum KuduMutationType
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface KuduClusterTestContext
{
- INSERT,
- DELETE,
- UPDATE,
- UPSERT
+ boolean kuduClusterBasedTest() default false;
+
}
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
similarity index 70%
copy from contrib/src/test/resources/kuduoutputoperator.properties
copy to kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
index 8f41c63..ddf5068 100644
--- a/contrib/src/test/resources/kuduoutputoperator.properties
+++ b/kudu/src/test/resources/kuduincrementalstepscaninputoperator.properties
@@ -17,6 +17,11 @@
# under the License.
#
-masterhosts=192.168.2.141:7051,192.168.2.133:7051
+masterhosts=192.168.1.41:7051
tablename=unittests
-pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file
+templatequerystring=SELECT * unittests WHERE introwkey > 1234 AND timestamprowkey >= :lowerbound AND timestamprowkey< :upperbound
+templatequerylowerboundparamname = :lowerbound
+templatequeryupperboundparamname = :upperbound
+templatequeryseedvalue = 1503001033219
+templatequeryincrementalvalue = 120000
+
diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/kudu/src/test/resources/kuduoutputoperator.properties
similarity index 87%
rename from contrib/src/test/resources/kuduoutputoperator.properties
rename to kudu/src/test/resources/kuduoutputoperator.properties
index 8f41c63..1c61d38 100644
--- a/contrib/src/test/resources/kuduoutputoperator.properties
+++ b/kudu/src/test/resources/kuduoutputoperator.properties
@@ -17,6 +17,6 @@
# under the License.
#
-masterhosts=192.168.2.141:7051,192.168.2.133:7051
+masterhosts=192.168.1.41:7051
tablename=unittests
-pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo
\ No newline at end of file
+pojoclassname=org.apache.apex.malhar.kudu.UnitTestTablePojo
diff --git a/kudu/src/test/resources/log4j.properties b/kudu/src/test/resources/log4j.properties
new file mode 100644
index 0000000..972d14d
--- /dev/null
+++ b/kudu/src/test/resources/log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.kudu=WARN
+log4j.logger.org.apache.apex.malhar.kudu=DEBUG
diff --git a/pom.xml b/pom.xml
index 298576d..d824587 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,6 +214,7 @@
<module>library</module>
<module>contrib</module>
<module>kafka</module>
+ <module>kudu</module>
<module>examples</module>
</modules>
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].