You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/05 00:40:18 UTC
apex-malhar git commit: APEXMALHAR-2455 Create example for Kafka 0.9
API exactly-once output
Repository: apex-malhar
Updated Branches:
refs/heads/master 10dd94ef5 -> b5c003c94
APEXMALHAR-2455 Create example for Kafka 0.9 API exactly-once output
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b5c003c9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b5c003c9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b5c003c9
Branch: refs/heads/master
Commit: b5c003c94d19b1308f62dc91515e82a660024264
Parents: 10dd94e
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Mon Apr 24 16:58:54 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Thu May 4 14:25:15 2017 -0700
----------------------------------------------------------------------
examples/kafka/README.md | 75 +++++++++
examples/kafka/logicalDAGKafkaExactlyOnce.png | Bin 0 -> 87679 bytes
examples/kafka/pom.xml | 1 +
.../kafka/exactlyonceoutput/Application.java | 94 +++++++++++
.../BatchSequenceGenerator.java | 105 +++++++++++++
.../PassthroughFailOperator.java | 155 +++++++++++++++++++
.../exactlyonceoutput/ValidationToFile.java | 132 ++++++++++++++++
.../properties-KafkaExactlyOnceOutput.xml | 114 ++++++++++++++
.../exactlyonceoutput/ApplicationTest.java | 150 ++++++++++++++++++
9 files changed, 826 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/kafka/README.md b/examples/kafka/README.md
index 1a7a9c4..5454edc 100644
--- a/examples/kafka/README.md
+++ b/examples/kafka/README.md
@@ -14,3 +14,78 @@ them out to a Kafka topic. Each line of the input file is considered a separate
message. The topic name, the name of the directory that is monitored for input
files, and other parameters are configurable in `META_INF/properties-hdfs2kafka.xml`.
+## Kafka exactly-once output example (Kafka 0.9 API)
+
+This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka
+output operators -- one that guarantees those semantics and one that does not,
+each writing to a different topic. It deliberately causes the intermediate pass-through
+operator to fail causing it to be restarted and some tuples to be reprocessed.
+Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates
+but the latter does and writes a single line to a HDFS file with the verification results
+of the following form:
+
+ Duplicates: exactly-once: 0, at-least-once: 5
+
+NOTE: KafkaInputOperator guarantees at-least-once semantics; in most scenarios
+it also yields exactly-once results, though in rare corner cases duplicate processing
+may occur. When this happens validation in this example will output wrong results.
+
+**DAG of this application:**
+
+![logical DAG of application](logicalDAGKafkaExactlyOnce.png)
+
+Plain text representation of DAG:
+
+ sequenceGenerator --> passthrough ==> {kafkaExactlyOnceOutputOperator, kafkaOutputOperator(at-least-once)}
+
+ {kafkaTopicExactly, kafkaTopicAtLeast} --> validationToFile
+
+
+**Running the Application**
+
+***Run Test***
+
+The application can be run in local mode which will write the validation file
+ to target/validation.txt
+
+
+***Run on Cluster***
+
+To run the application on a cluster a running Kafka service is needed.
+ A local Kafka single-node instance can easily be deployed
+ (see [kafka.apache.org/quickstart](https://kafka.apache.org/quickstart)).
+
+By default Kafka creates topics automatically when a message
+ to a non-existing topic arrives. If disabled manually creation of the two
+ topics is needed:
+```shell
+bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic exactly-once
+bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic at-least-once
+```
+
+Kafka topics should be cleared/deleted after every run in order for
+ validation to work correctly
+
+Enable topic deletion in Kafka's server.properties file:
+```
+delete.topic.enable=true
+```
+
+Delete topics:
+```shell
+bin/kafka-topics --zookeeper localhost:2181 --delete --topic exactly-once
+bin/kafka-topics --zookeeper localhost:2181 --delete --topic at-least-once
+```
+
+Check if deletion was successful:
+```shell
+kafka-topics --list --zookeeper localhost:2181
+```
+
+****properties:****
+
+By default the Kafka broker is set to 'localhost:9092'. To set a different broker
+ address change the value in properties.xml as well as in Application.java
+ The directory for the validation file and the number of tuples to be generated
+ can also be changed in properties.xml
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/logicalDAGKafkaExactlyOnce.png
----------------------------------------------------------------------
diff --git a/examples/kafka/logicalDAGKafkaExactlyOnce.png b/examples/kafka/logicalDAGKafkaExactlyOnce.png
new file mode 100644
index 0000000..0cc8004
Binary files /dev/null and b/examples/kafka/logicalDAGKafkaExactlyOnce.png differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index 497a39e..7e607e3 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -36,6 +36,7 @@
kafka2hdfs is a example show how to read lines from a Kafka topic using the new (0.9)
Kafka input operator and write them out to HDFS.
hdfs2kafka is a simple application to transfer data from HDFS to Kafka
+ KafkaExactlyOnceOutput example demonstrates exactly once behavior using KafkaSinglePortExactlyOnceOutputOperator (0.9)
</description>
<dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
new file mode 100644
index 0000000..da03645
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/Application.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * Kafka exactly-once example (Kafka 0.9 API)
+ *
+ * This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka
+ * output operators -- one that guarantees those semantics (using KafkaSingleExactlyOnceOutputOperator) and one that does not,
+ * each writing to a different topic. It deliberately causes the intermediate pass-through
+ * operator to fail causing it to be restarted and some tuples to be reprocessed.
+ * Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates
+ * but the latter does and writes a single line to a HDFS file with the verification results
+ * of the following form:
+ *
+ * Duplicates: exactly-once: 0, at-least-once: 5
+ *
+ * Operators:
+ *
+ * **BatchSequenceGenerator:**
+ * Generates a sequence of numbers starting going from 1 to maxTuplesTotal which can be set in the properties.
+ *
+ * **PassthroughFailOperator:**
+ * This operator kills itself after a defined number of processed tuples by intentionally throwing an exception.
+ * STRAM will redeploy the operator on a new container. The exception only needs to be thrown once, so a file is
+ * written to HDFS just before throwing the exception and its presence is checked after restart to determine
+ * if the exception was already thrown.
+ *
+ * **KafkaSinglePortExactlyOnceOutputOperator:**
+ * Topic, bootstrap.servers, serializer and deserializer are set in properties.xml.
+ * The topic names should not be changed for this application.
+ *
+ * **ValidationToFile:**
+ * Puts values of input into list depending on topic. If value of maxTuplesTotal is reached it will calculate duplicates
+ * and write validation output to HDFS. (output line will be printed in container dt.log as well).
+ */
+
+@ApplicationAnnotation(name = "KafkaExactlyOnceOutput")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+
+ BatchSequenceGenerator sequenceGenerator = dag.addOperator("sequenceGenerator", BatchSequenceGenerator.class);
+ PassthroughFailOperator passthroughFailOperator = dag.addOperator("passthrough", PassthroughFailOperator.class);
+ KafkaSinglePortExactlyOnceOutputOperator<String> kafkaExactlyOnceOutputOperator =
+ dag.addOperator("kafkaExactlyOnceOutputOperator", KafkaSinglePortExactlyOnceOutputOperator.class);
+ KafkaSinglePortOutputOperator kafkaOutputOperator =
+ dag.addOperator("kafkaOutputOperator", KafkaSinglePortOutputOperator.class);
+
+ dag.addStream("sequenceToPassthrough", sequenceGenerator.out, passthroughFailOperator.input);
+ dag.addStream("linesToKafka", passthroughFailOperator.output, kafkaOutputOperator.inputPort,
+ kafkaExactlyOnceOutputOperator.inputPort);
+
+ KafkaSinglePortInputOperator kafkaInputTopicExactly = dag.addOperator("kafkaTopicExactly", KafkaSinglePortInputOperator.class);
+ kafkaInputTopicExactly.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
+
+ KafkaSinglePortInputOperator kafkaInputTopicAtLeast = dag.addOperator("kafkaTopicAtLeast", KafkaSinglePortInputOperator.class);
+ kafkaInputTopicAtLeast.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
+
+ ValidationToFile validationToFile = dag.addOperator("validationToFile", ValidationToFile.class);
+
+ dag.addStream("messagesFromExactly", kafkaInputTopicExactly.outputPort, validationToFile.topicExactlyInput);
+ dag.addStream("messagesFromAtLeast", kafkaInputTopicAtLeast.outputPort, validationToFile.topicAtLeastInput);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
new file mode 100644
index 0000000..1654434
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/BatchSequenceGenerator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Simple operator that emits Strings from 1 to maxTuplesTotal
+ */
+public class BatchSequenceGenerator extends BaseOperator implements InputOperator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(BatchSequenceGenerator.class);
+
+ // properties
+
+ @Min(1)
+ private int maxTuplesTotal; // max number of tuples in total
+ @Min(1)
+ private int maxTuples; // max number of tuples per window
+
+ private int sleepTime;
+
+ private int numTuplesTotal = 0;
+
+ //start with empty windows to ensure tests run reliable
+ private int emptyWindowsCount = 0;
+
+ // transient fields
+
+ private transient int numTuples = 0; // number emitted in current window
+
+ public final transient DefaultOutputPort<String> out = new DefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ numTuples = 0;
+ super.beginWindow(windowId);
+ LOG.debug("beginWindow: " + windowId);
+ ++emptyWindowsCount;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+
+ if (numTuplesTotal < maxTuplesTotal && numTuples < maxTuples && emptyWindowsCount > 3) {
+ ++numTuplesTotal;
+ ++numTuples;
+ out.emit(String.valueOf(numTuplesTotal));
+ LOG.debug("Line emitted: " + numTuplesTotal);
+
+ try {
+ // avoid repeated calls to this function
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted");
+ }
+ }
+
+ }
+
+ public int getMaxTuples()
+ {
+ return maxTuples;
+ }
+
+ public void setMaxTuples(int v)
+ {
+ maxTuples = v;
+ }
+
+ public int getMaxTuplesTotal()
+ {
+ return maxTuplesTotal;
+ }
+
+ public void setMaxTuplesTotal(int v)
+ {
+ maxTuplesTotal = v;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
new file mode 100644
index 0000000..487c773
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/PassthroughFailOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * To produce an exactly-once scenario the PassthroughFailOperator kills itself after a certain number
+ * of processed lines by throwing an exception. YARN will deploy the Operator in a new container,
+ * hence not checkpointed tuples will be passed to the OutputOperators more than once.
+ */
+public class PassthroughFailOperator extends BaseOperator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(PassthroughFailOperator.class);
+ private boolean killed;
+
+ @NotNull
+ private int tuplesUntilKill;
+
+ //start with empty windows to ensure tests run reliable
+ private int emptyWindowsCount = 0;
+
+ @NotNull
+ private String directoryPath;
+
+ private String filePath;
+ private transient FileSystem hdfs;
+ private transient Path filePathObj;
+
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ /**
+ * Loads file from HDFS and sets {@link #killed} flag if it already exists
+ *
+ * @param context
+ */
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ String appId = context.getValue(Context.DAGContext.APPLICATION_ID);
+ filePath = directoryPath + "/" + appId;
+
+ LOG.info("FilePath: " + filePath);
+ filePathObj = new Path(filePath);
+ try {
+ hdfs = FileSystem.newInstance(filePathObj.toUri(), new Configuration());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ if (hdfs.exists(filePathObj)) {
+ killed = true;
+ LOG.info("file already exists -> Operator has been killed before");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ LOG.debug("WindowId: " + windowId);
+ ++emptyWindowsCount;
+ }
+
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ /**
+ * Creates file on HDFS identified by ApplicationId to save killed state, if operator has not been killed yet.
+ * Throws Exception to kill operator.
+ *
+ * @param line
+ */
+ @Override
+ public void process(String line)
+ {
+ if (emptyWindowsCount > 3) {
+ LOG.debug("LINE " + line);
+ if (killed) {
+ output.emit(line);
+ } else if (tuplesUntilKill > 0) {
+ output.emit(line);
+ tuplesUntilKill--;
+ } else {
+ try {
+ hdfs.createNewFile(filePathObj);
+ LOG.info("Created file " + filePath);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ //kill operator
+ LOG.info("Operator intentionally killed through exception");
+ RuntimeException e = new RuntimeException("Exception to intentionally kill operator");
+ throw e;
+ }
+ }
+ }
+ };
+
+ public String getDirectoryPath()
+ {
+ return directoryPath;
+ }
+
+ public void setDirectoryPath(String directoryPath)
+ {
+ this.directoryPath = directoryPath;
+ }
+
+ public int getTuplesUntilKill()
+ {
+ return tuplesUntilKill;
+ }
+
+ public void setTuplesUntilKill(int tuplesUntilKill)
+ {
+ this.tuplesUntilKill = tuplesUntilKill;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
new file mode 100644
index 0000000..0dd7818
--- /dev/null
+++ b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/exactlyonceoutput/ValidationToFile.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
+
+public class ValidationToFile extends AbstractSingleFileOutputOperator<byte[]>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ValidationToFile.class);
+
+ private String latestExactlyValue;
+ private String latestAtLeastValue;
+ //for tests
+ static boolean validationDone = false;
+
+ @NotNull
+ private String maxTuplesTotal;
+
+ List<String> exactlyList;
+ List<String> atLeastList;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ exactlyList = new ArrayList<>();
+ atLeastList = new ArrayList<>();
+ }
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
+ {
+ @Override
+ public void process(byte[] tuple) {}
+ };
+
+ public final transient DefaultInputPort<byte[]> topicExactlyInput = new DefaultInputPort<byte[]>()
+ {
+ @Override
+ public void process(byte[] tuple)
+ {
+ String message = new String(tuple);
+ latestExactlyValue = message;
+ exactlyList.add(message);
+ processTuple(tuple);
+ }
+
+ @Override
+ public StreamCodec<byte[]> getStreamCodec()
+ {
+ if (ValidationToFile.this.streamCodec == null) {
+ return super.getStreamCodec();
+ } else {
+ return streamCodec;
+ }
+ }
+ };
+
+ public final transient DefaultInputPort<byte[]> topicAtLeastInput = new DefaultInputPort<byte[]>()
+ {
+ @Override
+ public void process(byte[] tuple)
+ {
+ String message = new String(tuple);
+ latestAtLeastValue = message;
+ atLeastList.add(message);
+ processTuple(tuple);
+ }
+ };
+
+ @Override
+ protected byte[] getBytesForTuple(byte[] tuple)
+ {
+ if (latestExactlyValue != null && latestAtLeastValue != null) {
+ if (latestExactlyValue.equals(maxTuplesTotal) && latestAtLeastValue.equals(maxTuplesTotal)) {
+ Set<String> exactlySet = new HashSet<>(exactlyList);
+ Set<String> atLeastSet = new HashSet<>(atLeastList);
+
+ int numDuplicatesExactly = exactlyList.size() - exactlySet.size();
+ int numDuplicatesAtLeast = atLeastList.size() - atLeastSet.size();
+ LOG.info("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast);
+ validationDone = true;
+ return ("Duplicates: exactly-once: " + numDuplicatesExactly + ", at-least-once: " + numDuplicatesAtLeast).getBytes();
+ } else {
+ return new byte[0];
+ }
+ } else {
+ return new byte[0];
+
+ }
+ }
+
+ public String getMaxTuplesTotal()
+ {
+ return maxTuplesTotal;
+ }
+
+ public void setMaxTuplesTotal(String maxTuplesTotal)
+ {
+ this.maxTuplesTotal = maxTuplesTotal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
new file mode 100644
index 0000000..183d0c4
--- /dev/null
+++ b/examples/kafka/src/main/resources/META-INF/properties-KafkaExactlyOnceOutput.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+
+ <property>
+ <name>dt.operator.*.attr.MEMORY_MB</name>
+ <value>512</value>
+ </property>
+
+ <!-- set maxTuplesTotal for both BatchSequenceGenerator and ValidationToFile -->
+ <property>
+ <name>dt.operator.(sequenceGenerator)|(validationToFile).prop.maxTuplesTotal</name>
+ <value>20</value>
+ </property>
+ <!-- max tuples per window -->
+ <property>
+ <name>dt.operator.sequenceGenerator.prop.maxTuples</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dt.operator.passthrough.prop.directoryPath</name>
+ <value>/tmp/kafka_exactly</value>
+ </property>
+ <property>
+ <name>dt.operator.passthrough.prop.tuplesUntilKill</name>
+ <value>5</value>
+ </property>
+
+ <!-- Do not change any topics for this application-->
+ <!-- KafkaExactlyOnceOutputOperator -->
+ <property>
+ <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.topic</name>
+ <value>exactly-once</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(bootstrap.servers)</name>
+ <value>localhost:9092</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.serializer)</name>
+ <value>org.apache.kafka.common.serialization.StringSerializer</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(value.deserializer)</name>
+ <value>org.apache.kafka.common.serialization.StringDeserializer</value>
+ </property>
+
+ <!-- KafkaOutputOperator -->
+ <property>
+ <name>dt.operator.kafkaOutputOperator.prop.topic</name>
+ <value>at-least-once</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaOutputOperator.prop.properties(bootstrap.servers)</name>
+ <value>localhost:9092</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaOutputOperator.prop.properties(key.serializer)</name>
+ <value>org.apache.kafka.common.serialization.StringSerializer</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaOutputOperator.prop.properties(value.serializer)</name>
+ <value>org.apache.kafka.common.serialization.StringSerializer</value>
+ </property>
+
+ <!-- Validation: kafka input operator (0.9) -->
+ <property>
+ <name>dt.operator.kafkaTopicExactly.prop.topics</name>
+ <value>exactly-once</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaTopicExactly.prop.clusters</name>
+ <value>localhost:9092</value> <!-- broker (NOT zookeeper) address -->
+</property>
+
+ <property>
+ <name>dt.operator.kafkaTopicAtLeast.prop.topics</name>
+ <value>at-least-once</value>
+ </property>
+ <property>
+ <name>dt.operator.kafkaTopicAtLeast.prop.clusters</name>
+ <value>localhost:9092</value> <!-- broker (NOT zookeeper) address -->
+ </property>
+
+ <!-- ValidationToFile -->
+ <property>
+ <name>dt.operator.validationToFile.prop.outputFileName</name>
+ <value>validation.txt</value>
+ </property>
+ <property>
+ <name>dt.operator.validationToFile.prop.filePath</name>
+ <value>/tmp/exactlyonceoutput</value>
+ </property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b5c003c9/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
new file mode 100644
index 0000000..cc4f63c
--- /dev/null
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.kafka.exactlyonceoutput;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+ private static final String directory = "target/exactlyonceoutput";
+ private String tuplesUntilKill;
+
+ private static final int zkPort = 2181;
+ private static final int brokerPort = 9092;
+
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+
+ // broker port must match properties.xml
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ //remove '@After' to keep validation output file
+ @Before
+ @After
+ public void cleanup()
+ {
+ FileUtils.deleteQuietly(new File(directory));
+ }
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ createTopics();
+
+ // run app asynchronously; terminate after results are checked
+ Configuration conf = getConfig();
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(new Application(), conf);
+ ValidationToFile validationToFile = (ValidationToFile)lma.getDAG().getOperatorMeta("validationToFile").getOperator();
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // waits for the validation application to be done before shutting it down and checking its output
+ int count = 1;
+ int maxSleepRounds = 300;
+ while (!validationToFile.validationDone) {
+ logger.info("Sleeping ....");
+ Thread.sleep(500);
+ if (count > maxSleepRounds) {
+ fail("validationDone flag did not get set to true in ValidationToFile operator");
+ }
+ count++;
+ }
+ lc.shutdown();
+ checkOutput();
+ } catch (ConstraintViolationException e) {
+ fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ private Configuration getConfig()
+ {
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-KafkaExactlyOnceOutput.xml"));
+ conf.set("dt.operator.passthrough.prop.directoryPath", directory);
+ conf.set("dt.operator.validationToFile.prop.filePath", directory);
+ tuplesUntilKill = conf.get("dt.operator.passthrough.prop.tuplesUntilKill");
+ return conf;
+ }
+
+ private void createTopics() throws Exception
+ {
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ ku.createTopic("exactly-once");
+ ku.createTopic("at-least-once");
+ }
+
+ private void checkOutput() throws IOException
+ {
+ String validationOutput = "";
+ File folder = new File(directory);
+
+ FilenameFilter filenameFilter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ if (name.split("_")[0].equals("validation.txt")) {
+ return true;
+ }
+ return false;
+ }
+ };
+ File validationFile = folder.listFiles(filenameFilter)[0];
+ try (FileInputStream inputStream = new FileInputStream(validationFile)) {
+ while (validationOutput.isEmpty()) {
+ validationOutput = IOUtils.toString(inputStream);
+ logger.info("Validation output: {}", validationOutput);
+ }
+ }
+
+ Assert.assertTrue(validationOutput.contains("exactly-once: 0"));
+
+ //assert works only for tuplesUntilKill values low enough to kill operator before checkpointing
+ Assert.assertEquals("Duplicates: exactly-once: 0, at-least-once: " + tuplesUntilKill, validationOutput);
+ }
+}