You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/05 00:40:18 UTC

apex-malhar git commit: APEXMALHAR-2455 Create example for Kafka 0.9 API exactly-once output

Repository: apex-malhar
Updated Branches:
  refs/heads/master 10dd94ef5 -> b5c003c94


APEXMALHAR-2455 Create example for Kafka 0.9 API exactly-once output


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b5c003c9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b5c003c9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b5c003c9

Branch: refs/heads/master
Commit: b5c003c94d19b1308f62dc91515e82a660024264
Parents: 10dd94e
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Mon Apr 24 16:58:54 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Thu May 4 14:25:15 2017 -0700

----------------------------------------------------------------------
 examples/kafka/README.md                        |  75 +++++++++
 examples/kafka/logicalDAGKafkaExactlyOnce.png   | Bin 0 -> 87679 bytes
 examples/kafka/pom.xml                          |   1 +
 .../kafka/exactlyonceoutput/Application.java    |  94 +++++++++++
 .../BatchSequenceGenerator.java                 | 105 +++++++++++++
 .../PassthroughFailOperator.java                | 155 +++++++++++++++++++
 .../exactlyonceoutput/ValidationToFile.java     | 132 ++++++++++++++++
 .../properties-KafkaExactlyOnceOutput.xml       | 114 ++++++++++++++
 .../exactlyonceoutput/ApplicationTest.java      | 150 ++++++++++++++++++
 9 files changed, 826 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/kafka/README.md b/examples/kafka/README.md
index 1a7a9c4..5454edc 100644
--- a/examples/kafka/README.md
+++ b/examples/kafka/README.md
@@ -14,3 +14,78 @@ them out to a Kafka topic. Each line of the input file is considered a separate
 message. The topic name, the name of the directory that is monitored for input
 files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`.
 
+## Kafka exactly-once output example (Kafka 0.9 API)
+
+This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka
+output operators -- one that guarantees those semantics and one that does not,
+each writing to a different topic. It deliberately causes the intermediate pass-through
+operator to fail causing it to be restarted and some tuples to be reprocessed.
+Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates
+but the latter does and writes a single line to a HDFS file with the verification results
+of the following form:
+
+    Duplicates: exactly-once: 0, at-least-once: 5
+
+NOTE: KafkaInputOperator guarantees at-least-once semantics; in most scenarios
+it also yields exactly-once results, though in rare corner cases duplicate processing
+may occur. When this happens validation in this example will output wrong results.
+
+**DAG of this application:**
+
+![logical DAG of application](logicalDAGKafkaExactlyOnce.png)
+
+Plain text representation of DAG:
+
+    sequenceGenerator --> passthrough ==> {kafkaExactlyOnceOutputOperator, kafkaOutputOperator(at-least-once)}
+
+    {kafkaTopicExactly, kafkaTopicAtLeast} --> validationToFile
+
+
+**Running the Application**
+
+***Run Test***
+
+The application can be run in local mode which will write the validation file
+ to target/validation.txt
+
+
+***Run on Cluster***
+
+To run the application on a cluster a running Kafka service is needed.
+ A local Kafka single-node instance can easily be deployed
+ (see [kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)).
+
+By default Kafka creates topics automatically when a message
+ to a non-existing topic arrives. If disabled manually creation of the two
+ topics is needed:
+```shell
+bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic exactly-once
+bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic at-least-once
+```
+
+Kafka topics should be cleared/deleted after every run in order for
+ validation to work correctly
+
+Enable topic deletion in Kafka's server.properties file:
+```
+delete.topic.enable=true
+```
+
+Delete topics:
+```shell
+bin/kafka-topics --zookeeper localhost:2181 --delete --topic exactly-once
+bin/kafka-topics --zookeeper localhost:2181 --delete --topic at-least-once
+```
+
+Check if deletion was successful:
+```shell
+kafka-topics --list --zookeeper localhost:2181
+```
+
+****properties:****
+
+By default the Kafka broker is set to 'localhost:9092'. To set a different broker
+ address change the value in properties.xml as well as in Application.java
+ The directory for the validation file and the number of tuples to be generated
+ can also be changed in properties.xml
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/logicalDAGKafkaExactlyOnce.png
----------------------------------------------------------------------
diff --git a/examples/kafka/logicalDAGKafkaExactlyOnce.png b/examples/kafka/logicalDAGKafkaExactlyOnce.png
new file mode 100644
index 0000000..0cc8004
Binary files /dev/null and b/examples/kafka/logicalDAGKafkaExactlyOnce.png differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index 497a39e..7e607e3 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -36,6 +36,7 @@
     kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9)
     Kafka input operator and write them out to HDFS. 
     hdfs2kafka is a simple application to transfer data from HDFS to Kafka
+    KafkaExactlyOnceOutput example demonstrates exactly once behavior using KafkaSinglePortExactlyOnceOutputOperator (0.9)
   </description>
   
   <dependencies>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
new file mode 100644
index 0000000..da03645
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * Kafka exactly-once example (Kafka 0.9 API)
+ *
+ * This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka
+ * output operators -- one that guarantees those semantics (using KafkaSingleExactlyOnceOutputOperator) and one that does not,
+ * each writing to a different topic. It deliberately causes the intermediate pass-through
+ * operator to fail causing it to be restarted and some tuples to be reprocessed.
+ * Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates
+ * but the latter does and writes a single line to a HDFS file with the verification results
+ * of the following form:
+ *
+ * Duplicates: exactly-once: 0, at-least-once: 5
+ *
+ * Operators:
+ *
+ * **BatchSequenceGenerator:**
+ * Generates a sequence of numbers starting going from 1 to maxTuplesTotal which can be set in the properties.
+ *
+ * **PassthroughFailOperator:**
+ * This operator kills itself after a defined number of processed tuples by intentionally throwing an exception.
+ * STRAM will redeploy the operator on a new container. The exception only needs to be thrown once, so a file is
+ * written to HDFS just before throwing the exception and its presence is checked after restart to determine
+ * if the exception was already thrown.
+ *
+ * **KafkaSinglePortExactlyOnceOutputOperator:**
+ * Topic, bootstrap.servers, serializer and deserializer are set in properties.xml.
+ * The topic names should not be changed for this application.
+ *
+ * **ValidationToFile:**
+ * Puts values of input into list depending on topic. If value of maxTuplesTotal is reached it will calculate duplicates
+ * and write validation output to HDFS. (output line will be printed in container dt.log as well).
+ */
+
+@ApplicationAnnotation(name = "KafkaExactlyOnceOutput")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+
+    BatchSequenceGenerator sequenceGenerator = dag.addOperator("sequenceGenerator", BatchSequenceGenerator.class);
+    PassthroughFailOperator passthroughFailOperator = dag.addOperator("passthrough", PassthroughFailOperator.class);
+    KafkaSinglePortExactlyOnceOutputOperator<String> kafkaExactlyOnceOutputOperator =
+        dag.addOperator("kafkaExactlyOnceOutputOperator", KafkaSinglePortExactlyOnceOutputOperator.class);
+    KafkaSinglePortOutputOperator kafkaOutputOperator =
+        dag.addOperator("kafkaOutputOperator", KafkaSinglePortOutputOperator.class);
+
+    dag.addStream("sequenceToPassthrough", sequenceGenerator.out, passthroughFailOperator.input);
+    dag.addStream("linesToKafka", passthroughFailOperator.output, kafkaOutputOperator.inputPort,
+        kafkaExactlyOnceOutputOperator.inputPort);
+
+    KafkaSinglePortInputOperator kafkaInputTopicExactly = dag.addOperator("kafkaTopicExactly", KafkaSinglePortInputOperator.class);
+    kafkaInputTopicExactly.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
+
+    KafkaSinglePortInputOperator kafkaInputTopicAtLeast = dag.addOperator("kafkaTopicAtLeast", KafkaSinglePortInputOperator.class);
+    kafkaInputTopicAtLeast.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
+
+    ValidationToFile validationToFile = dag.addOperator("validationToFile", ValidationToFile.class);
+
+    dag.addStream("messagesFromExactly", kafkaInputTopicExactly.outputPort, validationToFile.topicExactlyInput);
+    dag.addStream("messagesFromAtLeast", kafkaInputTopicAtLeast.outputPort, validationToFile.topicAtLeastInput);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
new file mode 100644
index 0000000..1654434
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Simple operator that emits Strings from 1 to maxTuplesTotal
+ */
+public class BatchSequenceGenerator extends BaseOperator implements InputOperator
+{
+  private static final Logger LOG = LoggerFactory.getLogger(BatchSequenceGenerator.class);
+
+  // properties
+
+  @Min(1)
+  private int maxTuplesTotal;     // max number of tuples in total
+  @Min(1)
+  private int maxTuples;           // max number of tuples per window
+
+  private int sleepTime;
+
+  private int numTuplesTotal = 0;
+
+  //start with empty windows to ensure tests run reliable
+  private int emptyWindowsCount = 0;
+
+  // transient fields
+
+  private transient int numTuples = 0;    // number emitted in current window
+
+  public final transient DefaultOutputPort<String> out = new DefaultOutputPort<>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    numTuples = 0;
+    super.beginWindow(windowId);
+    LOG.debug("beginWindow: " + windowId);
+    ++emptyWindowsCount;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+
+    if (numTuplesTotal < maxTuplesTotal && numTuples < maxTuples && emptyWindowsCount > 3) {
+      ++numTuplesTotal;
+      ++numTuples;
+      out.emit(String.valueOf(numTuplesTotal));
+      LOG.debug("Line emitted: " + numTuplesTotal);
+
+      try {
+        // avoid repeated calls to this function
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted");
+      }
+    }
+
+  }
+
+  public int getMaxTuples()
+  {
+    return maxTuples;
+  }
+
+  public void setMaxTuples(int v)
+  {
+    maxTuples = v;
+  }
+
+  public int getMaxTuplesTotal()
+  {
+    return maxTuplesTotal;
+  }
+
+  public void setMaxTuplesTotal(int v)
+  {
+    maxTuplesTotal = v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
new file mode 100644
index 0000000..487c773
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * To produce an exactly-once scenario the PassthroughFailOperator kills itself after a certain number
+ * of processed lines by throwing an exception. YARN will deploy the Operator in a new container,
+ * hence not checkpointed tuples will be passed to the OutputOperators more than once.
+ */
+public class PassthroughFailOperator extends BaseOperator
+{
+  private static final Logger LOG = LoggerFactory.getLogger(PassthroughFailOperator.class);
+  private boolean killed;
+
+  @NotNull
+  private int tuplesUntilKill;
+
+  //start with empty windows to ensure tests run reliable
+  private int emptyWindowsCount = 0;
+
+  @NotNull
+  private String directoryPath;
+
+  private String filePath;
+  private transient FileSystem hdfs;
+  private transient Path filePathObj;
+
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+  /**
+   * Loads file from HDFS and sets {@link #killed} flag if it already exists
+   *
+   * @param context
+   */
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    String appId = context.getValue(Context.DAGContext.APPLICATION_ID);
+    filePath = directoryPath + "/" + appId;
+
+    LOG.info("FilePath: " + filePath);
+    filePathObj = new Path(filePath);
+    try {
+      hdfs = FileSystem.newInstance(filePathObj.toUri(), new Configuration());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    try {
+      if (hdfs.exists(filePathObj)) {
+        killed = true;
+        LOG.info("file already exists -> Operator has been killed before");
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    LOG.debug("WindowId: " + windowId);
+    ++emptyWindowsCount;
+  }
+
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    /**
+     * Creates file on HDFS identified by ApplicationId to save killed state, if operator has not been killed yet.
+     * Throws Exception to kill operator.
+     *
+     * @param line
+     */
+    @Override
+    public void process(String line)
+    {
+      if (emptyWindowsCount > 3) {
+        LOG.debug("LINE " + line);
+        if (killed) {
+          output.emit(line);
+        } else if (tuplesUntilKill > 0) {
+          output.emit(line);
+          tuplesUntilKill--;
+        } else {
+          try {
+            hdfs.createNewFile(filePathObj);
+            LOG.info("Created file " + filePath);
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          //kill operator
+          LOG.info("Operator intentionally killed through exception");
+          RuntimeException e = new RuntimeException("Exception to intentionally kill operator");
+          throw e;
+        }
+      }
+    }
+  };
+
+  public String getDirectoryPath()
+  {
+    return directoryPath;
+  }
+
+  public void setDirectoryPath(String directoryPath)
+  {
+    this.directoryPath = directoryPath;
+  }
+
+  public int getTuplesUntilKill()
+  {
+    return tuplesUntilKill;
+  }
+
+  public void setTuplesUntilKill(int tuplesUntilKill)
+  {
+    this.tuplesUntilKill = tuplesUntilKill;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
new file mode 100644
index 0000000..0dd7818
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+
+public class ValidationToFile extends AbstractSingleFileOutputOperator<byte[]>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ValidationToFile.class);
+
+  private String latestExactlyValue;
+  private String latestAtLeastValue;
+  //for tests
+  static boolean validationDone = false;
+
+  @NotNull
+  private String maxTuplesTotal;
+
+  List<String> exactlyList;
+  List<String> atLeastList;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    exactlyList = new ArrayList<>();
+    atLeastList = new ArrayList<>();
+  }
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
+  {
+    @Override
+    public void process(byte[] tuple) {}
+  };
+
+  public final transient DefaultInputPort<byte[]> topicExactlyInput = new DefaultInputPort<byte[]>()
+  {
+    @Override
+    public void process(byte[] tuple)
+    {
+      String message = new String(tuple);
+      latestExactlyValue = message;
+      exactlyList.add(message);
+      processTuple(tuple);
+    }
+
+    @Override
+    public StreamCodec<byte[]> getStreamCodec()
+    {
+      if (ValidationToFile.this.streamCodec == null) {
+        return super.getStreamCodec();
+      } else {
+        return streamCodec;
+      }
+    }
+  };
+
+  public final transient DefaultInputPort<byte[]> topicAtLeastInput = new DefaultInputPort<byte[]>()
+  {
+    @Override
+    public void process(byte[] tuple)
+    {
+      String message = new String(tuple);
+      latestAtLeastValue = message;
+      atLeastList.add(message);
+      processTuple(tuple);
+    }
+  };
+
+  @Override
+  protected byte[] getBytesForTuple(byte[] tuple)
+  {
+    if (latestExactlyValue != null && latestAtLeastValue != null) {
+      if (latestExactlyValue.equals(maxTuplesTotal) && latestAtLeastValue.equals(maxTuplesTotal)) {
+        Set<String> exactlySet = new HashSet<>(exactlyList);
+        Set<String> atLeastSet = new HashSet<>(atLeastList);
+
+        int numDuplicatesExactly = exactlyList.size() - exactlySet.size();
+        int numDuplicatesAtLeast = atLeastList.size() - atLeastSet.size();
+        LOG.info("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast);
+        validationDone = true;
+        return ("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast).getBytes();
+      } else {
+        return new byte[0];
+      }
+    } else {
+      return new byte[0];
+
+    }
+  }
+
+  public String getMaxTuplesTotal()
+  {
+    return maxTuplesTotal;
+  }
+
+  public void setMaxTuplesTotal(String maxTuplesTotal)
+  {
+    this.maxTuplesTotal = maxTuplesTotal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
new file mode 100644
index 0000000..183d0c4
--- /dev/null
+++ b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+
+  <property>
+    <name>dt.operator.*.attr.MEMORY_MB</name>
+    <value>512</value>
+  </property>
+
+  <!-- set maxTuplesTotal for both BatchSequenceGenerator and ValidationToFile -->
+  <property>
+    <name>dt.operator.(sequenceGenerator)|(validationToFile).prop.maxTuplesTotal</name>
+    <value>20</value>
+  </property>
+  <!-- max tuples per window -->
+  <property>
+    <name>dt.operator.sequenceGenerator.prop.maxTuples</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.operator.passthrough.prop.directoryPath</name>
+    <value>/tmp/kafka_exactly</value>
+  </property>
+  <property>
+    <name>dt.operator.passthrough.prop.tuplesUntilKill</name>
+    <value>5</value>
+  </property>
+
+  <!-- Do not change any topics for this application-->
+  <!-- KafkaExactlyOnceOutputOperator -->
+  <property>
+    <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.topic</name>
+    <value>exactly-once</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(bootstrap.servers)</name>
+    <value>localhost:9092</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.serializer)</name>
+    <value>org.apache.kafka.common.serialization.StringSerializer</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.deserializer)</name>
+    <value>org.apache.kafka.common.serialization.StringDeserializer</value>
+  </property>
+
+  <!-- KafkaOutputOperator -->
+  <property>
+    <name>dt.operator.kafkaOutputOperator.prop.topic</name>
+    <value>at-least-once</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaOutputOperator.prop.properties(bootstrap.servers)</name>
+    <value>localhost:9092</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaOutputOperator.prop.properties(key.serializer)</name>
+    <value>org.apache.kafka.common.serialization.StringSerializer</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaOutputOperator.prop.properties(value.serializer)</name>
+    <value>org.apache.kafka.common.serialization.StringSerializer</value>
+  </property>
+
+  <!-- Validation: kafka input operator (0.9) -->
+  <property>
+    <name>dt.operator.kafkaTopicExactly.prop.topics</name>
+    <value>exactly-once</value>
+  </property>
+  <property>
+  <name>dt.operator.kafkaTopicExactly.prop.clusters</name>
+  <value>localhost:9092</value>  <!-- broker (NOT zookeeper) address -->
+</property>
+
+  <property>
+    <name>dt.operator.kafkaTopicAtLeast.prop.topics</name>
+    <value>at-least-once</value>
+  </property>
+  <property>
+    <name>dt.operator.kafkaTopicAtLeast.prop.clusters</name>
+    <value>localhost:9092</value>  <!-- broker (NOT zookeeper) address -->
+  </property>
+
+  <!-- ValidationToFile -->
+  <property>
+    <name>dt.operator.validationToFile.prop.outputFileName</name>
+    <value>validation.txt</value>
+  </property>
+  <property>
+    <name>dt.operator.validationToFile.prop.filePath</name>
+    <value>/tmp/exactlyonceoutput</value>
+  </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
new file mode 100644
index 0000000..cc4f63c
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+  private static final String directory = "target/exactlyonceoutput";
+  private String tuplesUntilKill;
+
+  private static final int zkPort = 2181;
+  private static final int brokerPort = 9092;
+
+  private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+
+  // broker port must match properties.xml
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  //remove '@After' to keep validation output file
+  @Before
+  @After
+  public void cleanup()
+  {
+    FileUtils.deleteQuietly(new File(directory));
+  }
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      createTopics();
+
+      // run app asynchronously; terminate after results are checked
+      Configuration conf = getConfig();
+      LocalMode lma = LocalMode.newInstance();
+      lma.prepareDAG(new Application(), conf);
+      ValidationToFile validationToFile = (ValidationToFile)lma.getDAG().getOperatorMeta("validationToFile").getOperator();
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // waits for the validation application to be done before shutting it down and checking its output
+      int count = 1;
+      int maxSleepRounds = 300;
+      while (!validationToFile.validationDone) {
+        logger.info("Sleeping ....");
+        Thread.sleep(500);
+        if (count > maxSleepRounds) {
+          fail("validationDone flag did not get set to true in ValidationToFile operator");
+        }
+        count++;
+      }
+      lc.shutdown();
+      checkOutput();
+    } catch (ConstraintViolationException e) {
+      fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  private Configuration getConfig()
+  {
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-KafkaExactlyOnceOutput.xml"));
+    conf.set("dt.operator.passthrough.prop.directoryPath", directory);
+    conf.set("dt.operator.validationToFile.prop.filePath", directory);
+    tuplesUntilKill = conf.get("dt.operator.passthrough.prop.tuplesUntilKill");
+    return conf;
+  }
+
+  private void createTopics() throws Exception
+  {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    ku.createTopic("exactly-once");
+    ku.createTopic("at-least-once");
+  }
+
+  private void checkOutput() throws IOException
+  {
+    String validationOutput = "";
+    File folder = new File(directory);
+
+    FilenameFilter filenameFilter = new FilenameFilter()
+    {
+      @Override
+      public boolean accept(File dir, String name)
+      {
+        if (name.split("_")[0].equals("validation.txt")) {
+          return true;
+        }
+        return false;
+      }
+    };
+    File validationFile = folder.listFiles(filenameFilter)[0];
+    try (FileInputStream inputStream = new FileInputStream(validationFile)) {
+      while (validationOutput.isEmpty()) {
+        validationOutput = IOUtils.toString(inputStream);
+        logger.info("Validation output: {}", validationOutput);
+      }
+    }
+
+    Assert.assertTrue(validationOutput.contains("exactly-once: 0"));
+
+    //assert works only for tuplesUntilKill values low enough to kill operator before checkpointing
+    Assert.assertEquals("Duplicates: exactly-once: 0, at-least-once: " + tuplesUntilKill, validationOutput);
+  }
+}