You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/07/02 19:04:57 UTC

[4/5] apex-malhar git commit: APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10 consumer API

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
new file mode 100644
index 0000000..05faab6
--- /dev/null
+++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.common.PartitionInfo;
+
+import com.google.common.collect.Sets;
+
+/**
+ * An one-to-one partitioner implementation that always returns same amount of operator partitions as
+ * Kafka partitions for the topics that operator subscribe
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public class OneToOnePartitioner extends AbstractKafkaPartitioner
+{
+
+  public OneToOnePartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator)
+  {
+    super(clusters, topics, prototypeOperator);
+  }
+
+  @Override
+  List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata)
+  {
+    List<Set<PartitionMeta>> currentAssignment = new LinkedList<>();
+    for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
+      for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
+        for (PartitionInfo pif : topicPartition.getValue()) {
+          currentAssignment.add(Sets.newHashSet(new PartitionMeta(clusterMap.getKey(),
+              topicPartition.getKey(), pif.partition())));
+        }
+      }
+    }
+    return currentAssignment;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
new file mode 100644
index 0000000..feafa3b
--- /dev/null
+++ b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public enum PartitionStrategy
+{
+  /**
+   * Each operator partition connect to only one kafka partition
+   */
+  ONE_TO_ONE,
+  /**
+   * Each operator consumes from several kafka partitions with overall input rate under
+   * some certain hard limit in msgs/s or bytes/s
+   * For now it <b>only</b> support <b>simple kafka consumer</b>
+   */
+  ONE_TO_MANY,
+  /**
+   * 1 to N partition based on the heuristic function
+   * <b>NOT</b> implemented yet
+   * TODO implement this later
+   */
+  ONE_TO_MANY_HEURISTIC
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumerPropertiesTest.java b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..c745bea
--- /dev/null
+++ b/kafka/kafka-common/src/test/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumerPropertiesTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.text.ParseException;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.kafka.common.KafkaException;
+
+import com.datatorrent.api.Context;
+
+public abstract class  AbstractKafkaConsumerPropertiesTest
+{
+
+  public abstract AbstractKafkaInputOperator createKafkaInputOperator();
+
+  public abstract String expectedException();
+
+  AbstractKafkaInputOperator kafkaInput = createKafkaInputOperator();
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      kafkaInput.setClusters("localhost:8087");
+      kafkaInput.setInitialPartitionCount(1);
+      kafkaInput.setTopics("apexTest");
+      kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+      Properties prop = new Properties();
+      prop.setProperty("security.protocol", "SASL_PLAINTEXT");
+      prop.setProperty("sasl.kerberos.service.name", "kafka");
+      kafkaInput.setConsumerProps(prop);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+    }
+  }
+
+  @Test
+  public void TestConsumerProperties() throws ParseException
+  {
+    //Added test on this check to ensure consumer properties are set and not reset between.
+    if (null != kafkaInput.getConsumerProps().get("security.protocol")) {
+      try {
+        kafkaInput.definePartitions(null, null);
+      } catch (KafkaException e) {
+        //Ensures the  properties of the consumer are set/not reset.
+        Assert.assertEquals(
+            expectedException(), e.getCause().getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/src/test/resources/log4j.properties b/kafka/kafka-common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71ac284
--- /dev/null
+++ b/kafka/kafka-common/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka.consumer=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.zookeeper=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/kafka010/XmlJavadocCommentsExtractor.xsl b/kafka/kafka010/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..ec72325
--- /dev/null
+++ b/kafka/kafka010/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attrbutes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/kafka010/pom.xml b/kafka/kafka010/pom.xml
new file mode 100755
index 0000000..02be496
--- /dev/null
+++ b/kafka/kafka010/pom.xml
@@ -0,0 +1,98 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-kafka-connectors</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-kafka010</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Apex Malhar Kafka Support using 0.10 Consumer API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.10.2.1</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.10.2.1</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-kafka-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka_2.11</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-kafka-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka_2.11</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.10.2.1</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
new file mode 100644
index 0000000..fb4115a
--- /dev/null
+++ b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer010.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Wrapper for 0.10.x version of Kafka consumer
+ */
+@InterfaceStability.Evolving
+public class KafkaConsumer010 implements AbstractKafkaConsumer
+{
+
+  private KafkaConsumer<byte[], byte[]> consumer;
+
+  public KafkaConsumer010(Properties properties)
+  {
+    consumer = new KafkaConsumer<>(properties);
+  }
+
+  /**
+   * Checks whether the consumer contains the specified partition or not
+   * @param topicPartition topic partition
+   * @return true if consumer contains the given partition, otherwise false
+   */
+  @Override
+  public boolean isConsumerContainsPartition(TopicPartition topicPartition)
+  {
+    return consumer.assignment().contains(topicPartition);
+  }
+
+  /**
+   * Seek to the specified offset for the given partition
+   * @param topicPartition topic partition
+   * @param offset given offset
+   */
+  @Override
+  public void seekToOffset(TopicPartition topicPartition, long offset)
+  {
+    consumer.seek(topicPartition, offset);
+  }
+
+  /**
+   * Fetch data for the topics or partitions specified using assign API.
+   * @param timeOut time in milliseconds, spent waiting in poll if data is not available in buffer.
+   * @return records
+   */
+  @Override
+  public ConsumerRecords<byte[], byte[]> pollRecords(long timeOut)
+  {
+    return consumer.poll(timeOut);
+  }
+
+  /**
+   * Commit the specified offsets for the specified list of topics and partitions to Kafka.
+   * @param offsets given offsets
+   * @param callback Callback to invoke when the commit completes
+   */
+  @Override
+  public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
+  {
+    consumer.commitAsync(offsets, callback);
+  }
+
+  /**
+   * Assign the specified list of partitions to the consumer
+   * @param partitions list of partitions
+   */
+  @Override
+  public void assignPartitions(List<TopicPartition> partitions)
+  {
+    consumer.assign(partitions);
+  }
+
+  /**
+   * Seek to the first offset for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  @Override
+  public void seekToBeginning(TopicPartition... partitions)
+  {
+    consumer.seekToBeginning(Arrays.asList(partitions));
+  }
+
+  /**
+   * Seek to the last offser for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  @Override
+  public void seekToEnd(TopicPartition... partitions)
+  {
+    consumer.seekToEnd(Arrays.asList(partitions));
+  }
+
+  /**
+   * Wrapper for Wakeup the consumer
+   */
+  @Override
+  public void wakeup()
+  {
+    consumer.wakeup();
+  }
+
+  /**
+   * Return the metrics kept by the consumer
+   * @return metrics
+   */
+  @Override
+  public Map<MetricName, ? extends Metric> metrics()
+  {
+    return consumer.metrics();
+  }
+
+  /**
+   * Wrapper for close the consumer
+   */
+  @Override
+  public void close()
+  {
+    consumer.close();
+  }
+
+  /**
+   * Resume all the partitions
+   */
+  @Override
+  public void resumeAllPartitions()
+  {
+    consumer.resume(this.getPartitions());
+  }
+
+  /**
+   * Return the list of partitions assigned to this consumer
+   * @return list of partitions
+   */
+  @Override
+  public Collection<TopicPartition> getPartitions()
+  {
+    return consumer.assignment();
+  }
+
+  /**
+   * Resume the specified partition
+   * @param tp partition
+   */
+  @Override
+  public void resumePartition(TopicPartition tp)
+  {
+    consumer.resume(Arrays.asList(tp));
+  }
+
+  /**
+   * Pause the specified partition
+   * @param tp partition
+   */
+  @Override
+  public void pausePartition(TopicPartition tp)
+  {
+    consumer.pause(Arrays.asList(tp));
+  }
+
+  /**
+   * Return the offset of the next record that will be fetched
+   * @param tp partition
+   */
+  @Override
+  public long positionPartition(TopicPartition tp)
+  {
+    return consumer.position(tp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
new file mode 100644
index 0000000..f314660
--- /dev/null
+++ b/kafka/kafka010/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This is just an example of single port operator emits only byte array messages
+ * The key and cluster information are ignored
+ * This class emit the value to the single output port
+ *
+ */
+@InterfaceStability.Evolving
+public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
+{
+  /**
+   * Create the consumer for 0.10.* version of Kafka
+   * @param prop consumer properties
+   * @return consumer
+   */
+  @Override
+  public AbstractKafkaConsumer createConsumer(Properties prop)
+  {
+    return new KafkaConsumer010(prop);
+  }
+
+  /**
+   * This output port emits tuples extracted from Kafka messages.
+   */
+  public final transient DefaultOutputPort<byte[]> outputPort = new DefaultOutputPort<>();
+
+  @Override
+  protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+  {
+    outputPort.emit(message.value());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..62c9941
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+public class KafkaConsumerPropertiesTest extends AbstractKafkaConsumerPropertiesTest
+{
+  @Override
+  public AbstractKafkaInputOperator createKafkaInputOperator()
+  {
+    return new KafkaSinglePortInputOperator();
+  }
+
+  @Override
+  public String expectedException()
+  {
+    //return new String("org.apache.kafka.common.KafkaException: Jaas configuration not found");
+    return new String("java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
new file mode 100644
index 0000000..09d878d
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * A bunch of test to verify the input operator will be automatically partitioned
+ * per kafka partition This test is launching its
+ * own Kafka cluster.
+ */
+@RunWith(Parameterized.class)
+public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+{
+
+  private int totalBrokers = 0;
+
+  private String partition = null;
+
+  private String testName = "";
+
+  public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
+
+  public class KafkaTestInfo extends TestWatcher
+  {
+    public org.junit.runner.Description desc;
+
+    public String getDir()
+    {
+      String methodName = desc.getMethodName();
+      String className = desc.getClassName();
+      return "target/" + className + "/" + methodName + "/" + testName;
+    }
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.desc = description;
+    }
+  }
+
+  @Rule
+  public final KafkaTestInfo testInfo = new KafkaTestInfo();
+
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
+  public static Collection<Object[]> testScenario()
+  {
+    return Arrays.asList(new Object[][]{
+      {true, false, "one_to_one"},// multi cluster with single partition
+      {true, false, "one_to_many"},
+      {true, true, "one_to_one"},// multi cluster with multi partitions
+      {true, true, "one_to_many"},
+      {false, true, "one_to_one"}, // single cluster with multi partitions
+      {false, true, "one_to_many"},
+      {false, false, "one_to_one"}, // single cluster with single partitions
+      {false, false, "one_to_many"}
+    });
+  }
+
+  @Before
+  public void before()
+  {
+    testName = TEST_TOPIC + testCounter++;
+    logger.info("before() test case: {}", testName);
+    tupleCollection.clear();
+    //reset count for next new test case
+    k = 0;
+
+    createTopic(0, testName);
+    if (hasMultiCluster) {
+      createTopic(1, testName);
+    }
+
+  }
+
+  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
+  {
+    // This class want to initialize several kafka brokers for multiple partitions
+    this.hasMultiCluster = hasMultiCluster;
+    this.hasMultiPartition = hasMultiPartition;
+    int cluster = 1 + (hasMultiCluster ? 1 : 0);
+    totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+    this.partition = partition;
+  }
+
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
+  private static List<String> tupleCollection = new LinkedList<>();
+
+  /**
+   * whether countDown latch count all tuples or just END_TUPLE
+   */
+  private static final boolean countDownAll = false;
+  private static final int scale = 2;
+  private static final int totalCount = 10 * scale;
+  private static final int failureTrigger = 3 * scale;
+  private static final int tuplesPerWindow = 5 * scale;
+  private static final int waitTime = 60000 + 300 * scale;
+
+  //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
+  //so, count valid tuple instead.
+  private static CountDownLatch latch;
+  private static boolean hasFailure = false;
+  private static int k = 0;
+  private static Thread monitorThread;
+
+  /**
+   * Test Operator to collect tuples from KafkaSingleInputStringOperator.
+   *
+   * @param
+   */
+  public static class CollectorModule extends BaseOperator
+  {
+    public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
+    {
+      @Override
+      public void process(byte[] bt)
+      {
+        processTuple(bt);
+      }
+    };
+
+    long currentWindowId;
+
+    long operatorId;
+
+    boolean isIdempotentTest = false;
+
+    transient List<String> windowTupleCollector = Lists.newArrayList();
+    private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
+    private int endTuples = 0;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      operatorId = context.getId();
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      currentWindowId = windowId;
+      windowTupleCollector.clear();
+      endTuples = 0;
+    }
+
+    public void processTuple(byte[] bt)
+    {
+      String tuple = new String(bt);
+      if (hasFailure && k++ == failureTrigger) {
+        //you can only kill yourself once
+        hasFailure = false;
+        throw new RuntimeException();
+      }
+      if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
+        endTuples++;
+      }
+
+      windowTupleCollector.add(tuple);
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      if (isIdempotentTest) {
+        String key = operatorId + "," + currentWindowId;
+        List<String> msgsInWin = tupleCollectedInWindow.get(key);
+        if (msgsInWin != null) {
+          Assert.assertEquals(
+              "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
+        } else {
+          List<String> newList = Lists.newArrayList();
+          newList.addAll(windowTupleCollector);
+          tupleCollectedInWindow.put(key, newList);
+        }
+      }
+
+      //discard the tuples of this window if except happened
+      int tupleSize = windowTupleCollector.size();
+      tupleCollection.addAll(windowTupleCollector);
+
+      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+
+      if (latch != null) {
+        Assert.assertTrue(
+            "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
+        while (countDownTupleSize > 0) {
+          latch.countDown();
+          --countDownTupleSize;
+        }
+        if (latch.getCount() == 0) {
+          /**
+           * The time before countDown() and the shutdown() of the application
+           * will cause fatal error:
+           * "Catastrophic Error: Invalid State - the operator blocked forever!"
+           * as the activeQueues could be cleared but alive haven't changed yet.
+           * throw the ShutdownException to let the engine shutdown;
+           */
+          try {
+            throw new ShutdownException();
+            //lc.shutdown();
+          } finally {
+            /**
+             * interrupt the engine thread, let it wake from sleep and handle
+             * the shutdown at this time, all payload should be handled. so it
+             * should be ok to interrupt
+             */
+            monitorThread.interrupt();
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
+   * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
+   *
+   * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
+   * consumer) and send using emitTuples() interface on output port]
+   *
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testInputOperator() throws Exception
+  {
+    hasFailure = false;
+    testInputOperator(false, false);
+  }
+
+  @Test
+  public void testInputOperatorWithFailure() throws Exception
+  {
+    hasFailure = true;
+    testInputOperator(true, false);
+  }
+
+  @Test
+  public void testIdempotentInputOperatorWithFailure() throws Exception
+  {
+    hasFailure = true;
+    testInputOperator(true, true);
+  }
+
+  public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
+  {
+    // each broker should get a END_TUPLE message
+    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+
+    logger.info(
+        "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
+        " hasMultiPartition: {}, partition: {}",
+        testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
+
+    // Start producer
+    KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
+    p.setSendCount(totalCount);
+    Thread t = new Thread(p);
+    t.start();
+
+    int expectedReceiveCount = totalCount + totalBrokers;
+
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    // Create KafkaSinglePortStringInputOperator
+    KafkaSinglePortInputOperator node = dag.addOperator(
+        "Kafka input" + testName, KafkaSinglePortInputOperator.class);
+    node.setInitialPartitionCount(1);
+    // set topic
+    node.setTopics(testName);
+    node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+    node.setClusters(getClusterConfig());
+    node.setStrategy(partition);
+    if (idempotent) {
+      node.setWindowDataManager(new FSWindowDataManager());
+    }
+
+    // Create Test tuple collector
+    CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
+    collector.isIdempotentTest = idempotent;
+
+    // Connect ports
+    dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
+        .setLocality(Locality.CONTAINER_LOCAL);
+
+    if (hasFailure) {
+      setupHasFailureTest(node, dag);
+    }
+
+    // Create local cluster
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
+    //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
+    //but Controller.runAsync() don't expose the thread which run it,
+    //so we don't know when the thread will be terminated.
+    //create this thread and then call join() to make sure the Controller shutdown completely.
+    monitorThread = new Thread((StramLocalCluster)lc, "master");
+    monitorThread.start();
+
+    boolean notTimeout = true;
+    try {
+      // Wait 60s for consumer finish consuming all the messages
+      notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
+      lc.shutdown();
+
+      //wait until control thread finished.
+      monitorThread.join();
+    } catch (Exception e) {
+      logger.warn(e.getMessage());
+    }
+
+    t.join();
+
+    if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
+      logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
+          expectedReceiveCount, testName, tupleCollection);
+    }
+    Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+        + tupleCollection, notTimeout);
+
+    // Check results
+    Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+        + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
+        expectedReceiveCount == tupleCollection.size());
+
+    logger.info("End of test case: {}", testName);
+  }
+
+  private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
+  {
+    operator.setHoldingBufferSize(5000);
+    dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
+    //  APPLICATION_PATH + "failureck", new Configuration()));
+    operator.setMaxTuplesPerWindow(tuplesPerWindow);
+  }
+
+  private String getClusterConfig()
+  {
+    String l = "localhost:";
+    return l + TEST_KAFKA_BROKER_PORT[0] +
+      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
new file mode 100644
index 0000000..235eeba
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZkUtils;
+
+/**
+ * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
+ */
+public class KafkaOperatorTestBase
+{
+
+  public static final String END_TUPLE = "END_TUPLE";
+  public static final int[] TEST_ZOOKEEPER_PORT;
+  public static final int[] TEST_KAFKA_BROKER_PORT;
+  public static final String TEST_TOPIC = "testtopic";
+  public static int testCounter = 0;
+
+  // get available ports
+  static {
+    ServerSocket[] listeners = new ServerSocket[6];
+    int[] p = new int[6];
+
+    try {
+      for (int i = 0; i < 6; i++) {
+        listeners[i] = new ServerSocket(0);
+        p[i] = listeners[i].getLocalPort();
+      }
+
+      for (int i = 0; i < 6; i++) {
+        listeners[i].close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
+    TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
+  }
+
+  static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
+  // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
+
+  // multiple brokers in multiple cluster
+  private static KafkaServerStartable[] broker = new KafkaServerStartable[2];
+
+  // multiple cluster
+  private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+
+  private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+
+  public static String baseDir = "target";
+
+  private static final String zkBaseDir = "zookeeper-server-data";
+  private static final String kafkaBaseDir = "kafka-server-data";
+  private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
+  private static final String[] kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
+  protected boolean hasMultiPartition = false;
+  protected boolean hasMultiCluster = false;
+
+  public static void startZookeeper(final int clusterId)
+  {
+    try {
+
+      int numConnections = 100;
+      int tickTime = 2000;
+      File dir = new File(baseDir, zkdir[clusterId]);
+
+      zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
+      zkFactory[clusterId] = new NIOServerCnxnFactory();
+      zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
+
+      zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
+      Thread.sleep(2000);
+      //kserver.startup();
+    } catch (Exception ex) {
+      logger.error(ex.getLocalizedMessage());
+    }
+  }
+
+  public static void stopZookeeper()
+  {
+    for (ZooKeeperServer zs : zkServer) {
+      if (zs != null) {
+        zs.shutdown();
+      }
+    }
+
+    for (ServerCnxnFactory zkf : zkFactory) {
+      if (zkf != null) {
+        zkf.closeAll();
+        zkf.shutdown();
+      }
+    }
+    zkServer = new ZooKeeperServer[2];
+    zkFactory = new ServerCnxnFactory[2];
+  }
+
+  public static void startKafkaServer(int clusterid, int brokerid)
+  {
+    Properties props = new Properties();
+    props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
+    props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid]).toString());
+    props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
+    props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid]);
+    props.setProperty("default.replication.factor", "1");
+    // set this to 50000 to boost the performance so most test data are in memory before flush to disk
+    props.setProperty("log.flush.interval.messages", "50000");
+
+    broker[clusterid] = new KafkaServerStartable(new KafkaConfig(props));
+    broker[clusterid].startup();
+
+  }
+
+  public static void startKafkaServer()
+  {
+
+    FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+    //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
+    //  new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+    startKafkaServer(0, 0);
+    //startKafkaServer(0, 1);
+    startKafkaServer(1, 0);
+    //startKafkaServer(1, 1);
+
+    // startup is asynch operation. wait 2 sec for server to startup
+
+  }
+
+  public static void stopKafkaServer()
+  {
+    for (int i = 0; i < broker.length; i++) {
+      if (broker[i] != null) {
+        broker[i].shutdown();
+        broker[i].awaitShutdown();
+        broker[i] = null;
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void beforeTest()
+  {
+    try {
+      startZookeeper();
+      startKafkaServer();
+    } catch (java.nio.channels.CancelledKeyException ex) {
+      logger.debug("LSHIL {}", ex.getLocalizedMessage());
+    }
+  }
+
+  public static void startZookeeper()
+  {
+    FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
+    startZookeeper(0);
+    startZookeeper(1);
+  }
+
+  public void createTopic(int clusterid, String topicName)
+  {
+    String[] args = new String[9];
+    args[0] = "--zookeeper";
+    args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
+    args[2] = "--replication-factor";
+    args[3] = "1";
+    args[4] = "--partitions";
+    if (hasMultiPartition) {
+      args[5] = "2";
+    } else {
+      args[5] = "1";
+    }
+    args[6] = "--topic";
+    args[7] = topicName;
+    args[8] = "--create";
+
+    ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
+    TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
+
+  }
+
+  @AfterClass
+  public static void afterTest()
+  {
+    try {
+      stopKafkaServer();
+      stopZookeeper();
+    } catch (Exception ex) {
+      logger.debug("LSHIL {}", ex.getLocalizedMessage());
+    }
+  }
+
+  public void setHasMultiPartition(boolean hasMultiPartition)
+  {
+    this.hasMultiPartition = hasMultiPartition;
+  }
+
+  public void setHasMultiCluster(boolean hasMultiCluster)
+  {
+    this.hasMultiCluster = hasMultiCluster;
+  }
+
+  public static class TestZookeeperServer extends ZooKeeperServer
+  {
+
+    public TestZookeeperServer()
+    {
+      super();
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
+    {
+      super(snapDir, logDir, tickTime);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
+    {
+      super(txnLogFactory, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
+        throws IOException
+    {
+      super(txnLogFactory, tickTime, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
+        int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+    {
+      super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
+      // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    protected void registerJMX()
+    {
+    }
+
+    @Override
+    protected void unregisterJMX()
+    {
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
new file mode 100644
index 0000000..6098bde
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A simple partitioner class for test purpose
+ * Key is a int string
+ * Messages are distributed to all partitions
+ * One for even number, the other for odd
+ */
+public class KafkaTestPartitioner implements Partitioner
+{
+  public KafkaTestPartitioner(VerifiableProperties props)
+  {
+
+  }
+
+  public KafkaTestPartitioner()
+  {
+
+  }
+
+  @Override
+  public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
+  {
+    int num_partitions = cluster.partitionsForTopic(topic).size();
+    return Integer.parseInt((String)key) % num_partitions;
+  }
+
+  @Override
+  public void close()
+  {
+
+  }
+
+  @Override
+  public void configure(Map<String, ?> map)
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
new file mode 100644
index 0000000..322f070
--- /dev/null
+++ b/kafka/kafka010/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A kafka producer for testing
+ */
+public class KafkaTestProducer implements Runnable
+{
+  //  private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
+  private final Producer<String, String> producer;
+  private final Producer<String, String> producer1;
+  private final String topic;
+  private int sendCount = 20;
+  // to generate a random int as a key for partition
+  private final Random rand = new Random();
+  private boolean hasPartition = false;
+  private boolean hasMultiCluster = false;
+  private List<String> messages;
+
+  // http://kafka.apache.org/documentation.html#producerconfigs
+  private String ackType = "1";
+
+  public int getSendCount()
+  {
+    return sendCount;
+  }
+
+  public void setSendCount(int sendCount)
+  {
+    this.sendCount = sendCount;
+  }
+
+  public void setMessages(List<String> messages)
+  {
+    this.messages = messages;
+  }
+
+  private Properties createProducerConfig(int cid)
+  {
+    Properties props = new Properties();
+    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
+    String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid];
+    brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid]) : "";
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+    props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
+    props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
+    props.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
+    props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+
+    return props;
+  }
+
+  public KafkaTestProducer(String topic)
+  {
+    this(topic, false);
+  }
+
+  public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
+  {
+    // Use random partitioner. Don't need the key type. Just set it to Integer.
+    // The message is of type String.
+    this.topic = topic;
+    this.hasPartition = hasPartition;
+    this.hasMultiCluster = hasMultiCluster;
+    producer = new KafkaProducer<>(createProducerConfig(0));
+    if (hasMultiCluster) {
+      producer1 = new KafkaProducer<>(createProducerConfig(1));
+    } else {
+      producer1 = null;
+    }
+  }
+
+  public KafkaTestProducer(String topic, boolean hasPartition)
+  {
+    this(topic, hasPartition, false);
+  }
+
+  private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList();
+
+  private void generateMessages()
+  {
+    // Create dummy message
+    int messageNo = 1;
+    while (messageNo <= sendCount) {
+      String messageStr = "_" + messageNo++;
+      int k = rand.nextInt(100);
+      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
+      if (hasMultiCluster && messageNo <= sendCount) {
+        messageStr = "_" + messageNo++;
+        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
+      }
+      // logger.debug(String.format("Producing %s", messageStr));
+    }
+    // produce the end tuple to let the test input operator know it's done produce messages
+    for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
+      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+      if (hasMultiCluster) {
+        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
+      }
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    if (messages == null) {
+      generateMessages();
+    } else {
+      for (String msg : messages) {
+        sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
+      }
+    }
+
+    producer.flush();
+    if (producer1 != null) {
+      producer1.flush();
+    }
+
+    try {
+      for (Future<RecordMetadata> task : sendTasks) {
+        task.get(30, TimeUnit.SECONDS);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    close();
+  }
+
+  public void close()
+  {
+    producer.close();
+    if (producer1 != null) {
+      producer1.close();
+    }
+  }
+
+  public String getAckType()
+  {
+    return ackType;
+  }
+
+  public void setAckType(String ackType)
+  {
+    this.ackType = ackType;
+  }
+} // End of KafkaTestProducer

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka010/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/kafka010/src/test/resources/log4j.properties b/kafka/kafka010/src/test/resources/log4j.properties
new file mode 100644
index 0000000..71ac284
--- /dev/null
+++ b/kafka/kafka010/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka.consumer=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.zookeeper=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/kafka09/XmlJavadocCommentsExtractor.xsl b/kafka/kafka09/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..ec72325
--- /dev/null
+++ b/kafka/kafka09/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attrbutes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/kafka09/pom.xml b/kafka/kafka09/pom.xml
new file mode 100755
index 0000000..532bbfd
--- /dev/null
+++ b/kafka/kafka09/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-kafka-connectors</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-kafka</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Apex Malhar Kafka Support using 0.9 Consumer API</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.1</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.9.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-kafka-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-kafka-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.1</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
new file mode 100644
index 0000000..faa5171
--- /dev/null
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumer09.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Wrapper for 0.9.x version of Kafka consumer
+ */
+@InterfaceStability.Evolving
+public class KafkaConsumer09 implements AbstractKafkaConsumer
+{
+  private KafkaConsumer<byte[], byte[]> consumer;
+
+  public KafkaConsumer09(Properties properties)
+  {
+    consumer = new KafkaConsumer<>(properties);
+  }
+
+  /**
+   * Checks whether the consumer contains the specified partition or not
+   * @param topicPartition topic partition
+   * @return true if consumer contains the given partition, otherwise false
+   */
+  @Override
+  public boolean isConsumerContainsPartition(TopicPartition topicPartition)
+  {
+    return consumer.assignment().contains(topicPartition);
+  }
+
+  /**
+   * Seek to the specified offset for the given partition
+   * @param topicPartition topic partition
+   * @param offset given offset
+   */
+  @Override
+  public void seekToOffset(TopicPartition topicPartition, long offset)
+  {
+    consumer.seek(topicPartition, offset);
+  }
+
+  /**
+   * Fetch data for the topics or partitions specified using assign API.
+   * @param timeOut time in milliseconds, spent waiting in poll if data is not available in buffer.
+   * @return records
+   */
+  @Override
+  public ConsumerRecords<byte[], byte[]> pollRecords(long timeOut)
+  {
+    return consumer.poll(timeOut);
+  }
+
+  /**
+   * Commit the specified offsets for the specified list of topics and partitions to Kafka.
+   * @param offsets given offsets
+   * @param callback Callback to invoke when the commit completes
+   */
+  @Override
+  public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
+  {
+    consumer.commitAsync(offsets, callback);
+  }
+
+  /**
+   * Assign the specified list of partitions to the consumer
+   * @param partitions list of partitions
+   */
+  @Override
+  public void assignPartitions(List<TopicPartition> partitions)
+  {
+    consumer.assign(partitions);
+  }
+
+  /**
+   * Seek to the first offset for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  @Override
+  public void seekToBeginning(TopicPartition... partitions)
+  {
+    consumer.seekToBeginning(partitions);
+  }
+
+  /**
+   * Seek to the last offser for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  @Override
+  public void seekToEnd(TopicPartition... partitions)
+  {
+    consumer.seekToEnd(partitions);
+  }
+
+  /**
+   * Wrapper for Wakeup the consumer
+   */
+  @Override
+  public void wakeup()
+  {
+    consumer.wakeup();
+  }
+
+  /**
+   * Return the metrics kept by the consumer
+   * @return metrics
+   */
+  @Override
+  public Map<MetricName, ? extends Metric> metrics()
+  {
+    return consumer.metrics();
+  }
+
+  /**
+   * Wrapper for close the consumer
+   */
+  @Override
+  public void close()
+  {
+    consumer.close();
+  }
+
+  /**
+   * Resume all the partitions
+   */
+  @Override
+  public void resumeAllPartitions()
+  {
+    consumer.resume(Iterables.toArray(this.getPartitions(),TopicPartition.class));
+  }
+
+  /**
+   * Return the list of partitions assigned to this consumer
+   * @return list of partitions
+   */
+  @Override
+  public Collection<TopicPartition> getPartitions()
+  {
+    return consumer.assignment();
+  }
+
+  /**
+   * Resume the specified partition
+   * @param tp partition
+   */
+  @Override
+  public void resumePartition(TopicPartition tp)
+  {
+    consumer.resume(tp);
+  }
+
+  /**
+   * Pause the specified partition
+   * @param tp partition
+   */
+  @Override
+  public void pausePartition(TopicPartition tp)
+  {
+    consumer.pause(tp);
+  }
+
+  /**
+   * Return the offset of the next record that will be fetched
+   * @param tp partition
+   */
+  @Override
+  public long positionPartition(TopicPartition tp)
+  {
+    return consumer.position(tp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
new file mode 100644
index 0000000..23c519f
--- /dev/null
+++ b/kafka/kafka09/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics.
+ * <br>
+ *
+ * <p>
+ * <b>Requirements</b>
+ * <li>In the Kafka message, only Value will be available for users</li>
+ * <li>Users need to provide Value deserializers for Kafka message as it is used during recovery</li>
+ * <li>Value type should have well defined Equals & HashCodes,
+ * as during messages are stored in HashMaps for comparison.</li>
+ * <p>
+ * <b>Recovery handling</b>
+ * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at the endWindow</li>
+ * <li> During recovery,
+ * <ul>
+ * <li>Partially written Streaming Window before the crash is constructed. ( Explained below ) </li>
+ * <li>Tuples from the completed Streaming Window's are skipped </li>
+ * <li>Tuples coming for the partially written Streaming Window are skipped.
+ * (No assumption is made on the order and the uniqueness of the tuples) </li>
+ * </ul>
+ * </li>
+ * </p>
+ *
+ * <p>
+ * <b>Partial Window Construction</b>
+ * <li> Operator uses the Key in the Kafka message, which is not available for use by the operator users.</li>
+ * <li> Key is used to uniquely identify the message written by the particular instance of this operator.</li>
+ * This allows multiple writers to same Kafka partitions. Format of the key is "APPLICATTION_ID#OPERATOR_ID".
+ * <li>During recovery Kafka partitions are read between the latest offset and the last written offsets.</li>
+ * <li>All the tuples written by the particular instance is kept in the Map</li>
+ * </p>
+ *
+ * <p>
+ * <b>Limitations</b>
+ * <li> Key in the Kafka message is reserved for Operator's use </li>
+ * <li> During recovery, operator needs to read tuples between 2 offsets,
+ * if there are lot of data to be read, Operator may
+ * appear to be blocked to the Stram and can kill the operator. </li>
+ * </p>
+ *
+ * @displayName Kafka Single Port Exactly Once Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ * @since 3.5.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator<String, T>
+    implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map<T, Integer> partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+
+  public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+  public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      sendTuple(tuple);
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+
+    if (getProperties().getProperty(VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+      throw new IllegalArgumentException(
+          "Value deserializer needs to be set for the operator, as it is used during recovery.");
+    }
+
+    super.setup(context);
+
+    this.operatorId = context.getId();
+    this.windowDataManager.setup(context);
+    this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+    this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+
+    this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    this.windowId = windowId;
+
+    if (windowId == windowDataManager.getLargestCompletedWindow()) {
+      rebuildPartialWindow();
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.committed(windowId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+    consumer.close();
+    super.teardown();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+
+    if (!partialWindowTuples.isEmpty()) {
+      throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset.");
+    }
+
+    // Every tuples should be written before the offsets are stored in the window data manager.
+    getProducer().flush();
+
+    try {
+      this.windowDataManager.save(getPartitionsAndOffsets(true), windowId);
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  private boolean doesKeyBelongsToThisInstance(int operatorId, String key)
+  {
+    String[] split = key.split(KEY_SEPARATOR);
+
+    if (split.length != 2) {
+      return false;
+    }
+
+    if ((Integer.parseInt(split[1]) == operatorId) && (split[0].equals(appName))) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean alreadyInKafka(T message)
+  {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      return true;
+    }
+
+    if (partialWindowTuples.containsKey(message)) {
+
+      Integer val = partialWindowTuples.get(message);
+
+      if (val == 0) {
+        return false;
+      } else if (val == 1) {
+        partialWindowTuples.remove(message);
+      } else {
+        partialWindowTuples.put(message, val - 1);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private Map<Integer, Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException
+  {
+    List<PartitionInfo> partitionInfoList = consumer.partitionsFor(getTopic());
+    List<TopicPartition> topicPartitionList = new java.util.ArrayList<>();
+
+    for (PartitionInfo partitionInfo : partitionInfoList) {
+      topicPartitionList.add(new TopicPartition(getTopic(), partitionInfo.partition()));
+    }
+
+    Map<Integer, Long> parttionsAndOffset = new HashMap<>();
+    consumer.assign(topicPartitionList);
+
+    for (PartitionInfo partitionInfo : partitionInfoList) {
+      try {
+        TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
+        if (latest) {
+          consumer.seekToEnd(topicPartition);
+        } else {
+          consumer.seekToBeginning(topicPartition);
+        }
+        parttionsAndOffset.put(partitionInfo.partition(), consumer.position(topicPartition));
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    return parttionsAndOffset;
+  }
+
+  private void rebuildPartialWindow()
+  {
+    logger.info("Rebuild the partial window after " + windowDataManager.getLargestCompletedWindow());
+
+    Map<Integer, Long> storedOffsets;
+    Map<Integer, Long> currentOffsets;
+
+    try {
+      storedOffsets = (Map<Integer, Long>)this.windowDataManager.retrieve(windowId);
+      currentOffsets = getPartitionsAndOffsets(true);
+    } catch (IOException | ExecutionException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (currentOffsets == null) {
+      logger.info("No tuples found while building partial window " + windowDataManager.getLargestCompletedWindow());
+      return;
+    }
+
+    if (storedOffsets == null) {
+
+      logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
+
+      try {
+        storedOffsets = getPartitionsAndOffsets(false);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    List<TopicPartition> topicPartitions = new ArrayList<>();
+
+    for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+      topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
+    }
+
+    consumer.assign(topicPartitions);
+
+    for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
+      Long storedOffset = 0L;
+      Integer currentPartition = entry.getKey();
+      Long currentOffset = entry.getValue();
+
+      if (storedOffsets.containsKey(currentPartition)) {
+        storedOffset = storedOffsets.get(currentPartition);
+      }
+
+      if (storedOffset >= currentOffset) {
+        continue;
+      }
+
+      try {
+        consumer.seek(new TopicPartition(getTopic(), currentPartition), storedOffset);
+      } catch (Exception ex) {
+        logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible.");
+        throw new RuntimeException(ex);
+      }
+
+      int kafkaAttempt = 0;
+
+      while (true) {
+
+        ConsumerRecords<String, T> consumerRecords = consumer.poll(100);
+
+        if (consumerRecords.count() == 0) {
+          if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
+            break;
+          }
+        } else {
+          kafkaAttempt = 0;
+        }
+
+        boolean crossedBoundary = false;
+
+        for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
+
+          if (consumerRecord.offset() >= currentOffset) {
+            crossedBoundary = true;
+            break;
+          }
+
+          if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
+            continue;
+          }
+
+          T value = consumerRecord.value();
+
+          if (partialWindowTuples.containsKey(value)) {
+            Integer count = partialWindowTuples.get(value);
+            partialWindowTuples.put(value, count + 1);
+          } else {
+            partialWindowTuples.put(value, 1);
+          }
+
+        }
+
+        if (crossedBoundary) {
+          break;
+        }
+      }
+    }
+  }
+
+  private KafkaConsumer KafkaConsumerInit()
+  {
+    Properties props = new Properties();
+
+    props.put(BOOTSTRAP_SERVERS_CONFIG, getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, getProperties().get(VALUE_DESERIALIZER_CLASS_CONFIG));
+
+    return new KafkaConsumer<>(props);
+  }
+
+  protected void sendTuple(T tuple)
+  {
+    if (alreadyInKafka(tuple)) {
+      return;
+    }
+
+    getProducer().send(new ProducerRecord<>(getTopic(), key, tuple), new Callback()
+    {
+      public void onCompletion(RecordMetadata metadata, Exception e)
+      {
+        if (e != null) {
+          logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
+          throw new RuntimeException(e);
+        }
+      }
+    });
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
+}
+