You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:09 UTC

[apex-malhar] branch master updated (783b7fe -> 658cafc)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git.


    from 783b7fe  APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses don't need schemas
     new 42bfa56  Kafka to JDBC exactly-once example.
     new 4aa3812  Use algo/UniqueCounter to reduce code
     new ddbf26f  Atomic file output app
     new 9ad8447  APEXMALHAR-2233 Migrate exactly-once examples.
     new 658cafc  Expand README and improve formatting.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/exactly-once/README.md                    |  38 ++++++
 examples/{transform => exactly-once}/pom.xml       |  35 +++---
 .../src/assemble/appPackage.xml                    |   0
 .../exactlyonce/ExactlyOnceFileOutputApp.java      | 116 +++++++++++++++++
 .../exactlyonce/ExactlyOnceJdbcOutputApp.java      | 121 ++++++++++++++++++
 .../src/main/resources/META-INF/properties.xml     |  30 +++--
 .../exactlyonce/ExactlyOnceFileOutputAppTest.java  | 109 ++++++++++++++++
 .../exactlyonce/ExactlyOnceJdbcOutputTest.java     | 140 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |   4 +-
 examples/pom.xml                                   |   1 +
 10 files changed, 561 insertions(+), 33 deletions(-)
 create mode 100644 examples/exactly-once/README.md
 copy examples/{transform => exactly-once}/pom.xml (70%)
 copy examples/{throttle => exactly-once}/src/assemble/appPackage.xml (100%)
 create mode 100644 examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
 create mode 100644 examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
 copy examples/{ftp => exactly-once}/src/main/resources/META-INF/properties.xml (59%)
 create mode 100644 examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
 create mode 100644 examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
 copy examples/{dedup => exactly-once}/src/test/resources/log4j.properties (98%)

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].

[apex-malhar] 03/05: Atomic file output app

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit ddbf26f00f5fa04774792294ccab7599667f1b77
Author: Chandni Singh <cs...@apache.org>
AuthorDate: Wed Mar 2 18:27:15 2016 -0800

    Atomic file output app
---
 .../com/example/myapexapp/AtomicFileOutputApp.java | 101 +++++++++++++++++++++
 .../example/myapexapp/AtomicFileOutputAppTest.java |  93 +++++++++++++++++++
 2 files changed, 194 insertions(+)

diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
new file mode 100644
index 0000000..55f8cc6
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+package com.example.myapexapp;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "AtomicFileOutput")
+public class AtomicFileOutputApp implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
+        new KafkaSinglePortStringInputOperator());
+    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
+    Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+
+    FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
+
+    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("words", kafkaInput.outputPort, count.data);
+    dag.addStream("counts", count.counts, fileWriter.input, cons.input);
+  }
+
+  /**
+   * This implementation of {@link AbstractFileOutputOperator} writes to a single file. However when it doesn't
+   * receive any tuples in an application window then it finalizes the file, i.e., the file is completed and will not
+   * be opened again.
+   * <p/>
+   * If more tuples are received after a hiatus then they will be written to a part file -
+   * {@link #FILE_NAME_PREFIX}.{@link #part}
+   */
+  public static class FileWriter extends AbstractFileOutputOperator<KeyValPair<String, Integer>>
+  {
+    static final String FILE_NAME_PREFIX = "filestore";
+
+    private int part;
+    private transient String currentFileName;
+
+    private transient boolean receivedTuples;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      currentFileName = (part == 0) ? FILE_NAME_PREFIX : FILE_NAME_PREFIX + "." + part;
+      super.setup(context);
+    }
+
+    @Override
+    protected String getFileName(KeyValPair<String, Integer> keyValPair)
+    {
+      return currentFileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(KeyValPair<String, Integer> keyValPair)
+    {
+      return (keyValPair.toString() + "\n").getBytes();
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      receivedTuples = false;
+    }
+
+    @Override
+    protected void processTuple(KeyValPair<String, Integer> tuple)
+    {
+      super.processTuple(tuple);
+      receivedTuples = true;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      //request for finalization if there is no input. This is done automatically if the file is rotated periodically
+      // or has a size threshold.
+      if (!receivedTuples && !endOffsets.isEmpty()) {
+        requestFinalize(currentFileName);
+        part++;
+        currentFileName = FILE_NAME_PREFIX + "." + part;
+      }
+    }
+  }
+}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
new file mode 100644
index 0000000..b539394
--- /dev/null
+++ b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
@@ -0,0 +1,93 @@
+package com.example.myapexapp;
+
+import java.io.File;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+public class AtomicFileOutputAppTest
+{
+  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String KAFKA_TOPIC = "exactly-once-test";
+  private static final String TARGET_DIR = "target/atomicFileOutput";
+
+  @Before
+  public void beforeTest() throws Exception {
+    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+    kafkaLauncher.startZookeeper();
+    kafkaLauncher.startKafkaServer();
+    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+  }
+
+  @After
+  public void afterTest() {
+    kafkaLauncher.stopKafkaServer();
+    kafkaLauncher.stopZookeeper();
+  }
+
+  @Test
+  public void testApplication() throws Exception {
+    try {
+
+      File targetDir = new File(TARGET_DIR);
+      FileUtils.deleteDirectory(targetDir);
+      FileUtils.forceMkdir(targetDir);
+
+      // produce some test data
+      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+      String[] words = "count the words from kafka and store them in the db".split("\\s+");
+      p.setMessages(Lists.newArrayList(words));
+      new Thread(p).start();
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+      conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+      lma.prepareDAG(new AtomicFileOutputApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync(); // test will terminate after results are available
+
+      long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+      File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+      while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+        Thread.sleep(1000);
+        LOG.debug("Waiting for {}", outputFile);
+      }
+
+      Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+          outputFile.isFile());
+
+      lc.shutdown();
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.

[apex-malhar] 02/05: Use algo/UniqueCounter to reduce code

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 4aa381224658c5ca92431e31cb10220a5a50339c
Author: Munagala V. Ramanath <am...@users.noreply.github.com>
AuthorDate: Tue Feb 23 05:58:46 2016 -0800

    Use algo/UniqueCounter to reduce code
---
 .../java/com/example/myapexapp/Application.java    | 37 ++++------------------
 1 file changed, 7 insertions(+), 30 deletions(-)

diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
index 8050d67..7700d68 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -16,6 +16,7 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -32,12 +33,12 @@ public class Application implements StreamingApplication
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
     kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
-    UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+    UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
     store.setStore(new JdbcTransactionalStore());
     ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
     dag.addStream("words", kafkaInput.outputPort, count.data);
-    dag.addStream("counts", count.count, store.input, cons.input);
+    dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
   public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
@@ -62,42 +63,18 @@ public class Application implements StreamingApplication
     }
   }
 
-  public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+  public static class UniqueCounterFlat extends UniqueCounter<String>
   {
-    /**
-     * The input port which receives incoming tuples.
-     */
-    public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-    {
-      /**
-       * Reference counts tuples
-       */
-      @Override
-      public void process(K tuple)
-      {
-        processTuple(tuple);
-      }
-
-    };
-
-    public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
-    {
-      @Override
-      public Unifier<KeyValPair<K, Integer>> getUnifier()
-      {
-        throw new UnsupportedOperationException("not partitionable");
-      }
-    };
+    public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
 
     @Override
     public void endWindow()
     {
-      for (Map.Entry<K, MutableInt> e: map.entrySet()) {
-        count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+      for (Map.Entry<String, MutableInt> e: map.entrySet()) {
+        counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
       }
       map.clear();
     }
-
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.

[apex-malhar] 04/05: APEXMALHAR-2233 Migrate exactly-once examples.

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 9ad844720f2b741bd54cfdda9d5f3a418921e6b1
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Jul 30 23:51:40 2017 -0700

    APEXMALHAR-2233 Migrate exactly-once examples.
---
 examples/exactly-once/README.md                    |  17 +++
 examples/exactly-once/pom.xml                      |  62 +++++++++
 examples/exactly-once/src/assemble/appPackage.xml  |  20 +++
 .../example/myapexapp/RandomNumberGenerator.java   |  47 -------
 .../exactlyonce/ExactlyOnceFileOutputApp.java}     |  33 +++--
 .../exactlyonce/ExactlyOnceJdbcOutputApp.java}     |  56 +++++++--
 .../src/main/resources/META-INF/properties.xml     |  52 ++++++--
 .../exactly-once/src/site/conf/my-app-conf1.xml    |  11 --
 .../com/example/myapexapp/ApplicationTest.java     | 124 ------------------
 .../example/myapexapp/AtomicFileOutputAppTest.java |  93 --------------
 .../exactlyonce/ExactlyOnceFileOutputAppTest.java  | 109 ++++++++++++++++
 .../exactlyonce/ExactlyOnceJdbcOutputTest.java     | 139 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  24 +++-
 examples/pom.xml                                   |   1 +
 14 files changed, 479 insertions(+), 309 deletions(-)

diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
new file mode 100644
index 0000000..5254b4c
--- /dev/null
+++ b/examples/exactly-once/README.md
@@ -0,0 +1,17 @@
+# Examples for end-to-end exactly-once
+
+## Read from Kafka, write to JDBC
+
+This application shows exactly-once output to JDBC through transactions:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java)
+
+## Read from Kafka, write to Files
+
+This application shows exactly-once output to HDFS through atomic file operation:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java)
diff --git a/examples/exactly-once/pom.xml b/examples/exactly-once/pom.xml
new file mode 100644
index 0000000..c198f48
--- /dev/null
+++ b/examples/exactly-once/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-examples-exactly-once</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Exactly-Once Examples</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>info.batey.kafka</groupId>
+      <artifactId>kafka-unit</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.4</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
index 7ad071c..a870807 100644
--- a/examples/exactly-once/src/assemble/appPackage.xml
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -1,3 +1,23 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
deleted file mode 100644
index eed344b..0000000
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * This is a simple operator that emits random number.
- */
-public class RandomNumberGenerator extends BaseOperator implements InputOperator
-{
-  private int numTuples = 100;
-  private transient int count = 0;
-
-  public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    count = 0;
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    if (count++ < numTuples) {
-      out.emit(Math.random());
-    }
-  }
-
-  public int getNumTuples()
-  {
-    return numTuples;
-  }
-
-  /**
-   * Sets the number of tuples to be emitted every window.
-   * @param numTuples number of tuples
-   */
-  public void setNumTuples(int numTuples)
-  {
-    this.numTuples = numTuples;
-  }
-}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
similarity index 67%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
index 55f8cc6..9aa2870 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
@@ -1,32 +1,47 @@
 /**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
 
+import org.apache.apex.examples.exactlyonce.ExactlyOnceJdbcOutputApp.KafkaSinglePortStringInputOperator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
-@ApplicationAnnotation(name = "AtomicFileOutput")
-public class AtomicFileOutputApp implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceFileOutput")
+public class ExactlyOnceFileOutputApp implements StreamingApplication
 {
   @Override
   public void populateDAG(DAG dag, Configuration configuration)
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
         new KafkaSinglePortStringInputOperator());
-    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    kafkaInput.setWindowDataManager(new FSWindowDataManager());
 
-    Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+    ExactlyOnceJdbcOutputApp.UniqueCounterFlat count = dag.addOperator("count",
+        new ExactlyOnceJdbcOutputApp.UniqueCounterFlat());
 
     FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
 
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
similarity index 57%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 7700d68..33ae9dc 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -1,38 +1,55 @@
 /**
- * Put your copyright and license info here.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Map;
+import java.util.Properties;
 
+import org.apache.apex.malhar.kafka.AbstractKafkaConsumer;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaConsumer09;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.lib.util.BaseUniqueKeyCounter;
 import com.datatorrent.lib.util.KeyValPair;
 
-@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
-public class Application implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceJbdcOutput")
+public class ExactlyOnceJdbcOutputApp implements StreamingApplication
 {
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
-    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    kafkaInput.setWindowDataManager(new FSWindowDataManager());
     UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
     store.setStore(new JdbcTransactionalStore());
@@ -77,4 +94,21 @@ public class Application implements StreamingApplication
     }
   }
 
+  public static class KafkaSinglePortStringInputOperator extends AbstractKafkaInputOperator
+  {
+    public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
+
+    @Override
+    public AbstractKafkaConsumer createConsumer(Properties properties)
+    {
+      return new KafkaConsumer09(properties);
+    }
+
+    @Override
+    protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+    {
+      outputPort.emit(new String(message.value()));
+    }
+  }
+
 }
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
index 876c39a..70f8812 100644
--- a/examples/exactly-once/src/main/resources/META-INF/properties.xml
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -1,24 +1,52 @@
 <?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <configuration>
-  <!-- 
+
   <property>
-    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
-    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+    <name>apex.operator.kafkaInput.prop.initialPartitionCount</name>
+    <value>1</value>
   </property>
-  -->
-  <!-- memory assigned to app master
   <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
+    <name>apex.operator.kafkaInput.prop.topics</name>
+    <value>exactly-once-example</value>
   </property>
-  -->
   <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
-    <value>1000</value>
+    <name>apex.operator.kafkaInput.prop.clusters</name>
+    <value>localhost:9092</value>
   </property>
+  
   <property>
-    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
-    <value>hello world: %s</value>
+    <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
   </property>
+  <property>
+    <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value>
+  </property>
+
+  <property>
+    <name>apex.application.ExactlyOnceFileOutput.operator.fileWriter.prop.filePath</name>
+    <value>target/atomicFileOutput</value>
+  </property>
+
 </configuration>
 
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index ccb2b66..0000000
--- a/examples/exactly-once/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-  <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
-    <value>1000</value>
-  </property>
-</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index c5aa69c..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.HashSet;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.example.myapexapp.Application;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Test the application in local mode.
- */
-public class ApplicationTest
-{
-  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String KAFKA_TOPIC = "exactly-once-test";
-  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
-  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
-  private static final String TABLE_NAME = "WORDS";
-
-  @Before
-  public void beforeTest() throws Exception {
-    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
-    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
-    kafkaLauncher.startZookeeper();
-    kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-
-    // setup hsqldb
-    Class.forName(DB_DRIVER).newInstance();
-
-    Connection con = DriverManager.getConnection(DB_URL);
-    Statement stmt = con.createStatement();
-
-    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
-        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
-        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
-        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
-        + ")";
-    stmt.executeUpdate(createMetaTable);
-
-    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
-        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
-    stmt.executeUpdate(createTable);
-
-  }
-
-  @After
-  public void afterTest() {
-    kafkaLauncher.stopKafkaServer();
-    kafkaLauncher.stopZookeeper();
-  }
-
-  @Test
-  public void testApplication() throws Exception {
-    try {
-      // produce some test data
-      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
-      String[] words = "count the words from kafka and store them in the db".split("\\s+");
-      p.setMessages(Lists.newArrayList(words));
-      new Thread(p).start();
-
-      LocalMode lma = LocalMode.newInstance();
-      Configuration conf = new Configuration(false);
-      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
-      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
-      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
-      conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
-      conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
-
-      lma.prepareDAG(new Application(), conf);
-      LocalMode.Controller lc = lma.getController();
-      lc.runAsync(); // test will terminate after results are available
-
-      HashSet<String> wordsSet = Sets.newHashSet(words);
-      Connection con = DriverManager.getConnection(DB_URL);
-      Statement stmt = con.createStatement();
-      int rowCount = 0;
-      long timeout = System.currentTimeMillis() + 30000; // 30s timeout
-      while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
-        Thread.sleep(1000);
-        String countQuery = "SELECT count(*) from " + TABLE_NAME;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        rowCount = resultSet.getInt(1);
-        resultSet.close();
-        LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
-      }
-      Assert.assertEquals("number of words", wordsSet.size(), rowCount);
-
-      lc.shutdown();
-
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
deleted file mode 100644
index b539394..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-public class AtomicFileOutputAppTest
-{
-  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String KAFKA_TOPIC = "exactly-once-test";
-  private static final String TARGET_DIR = "target/atomicFileOutput";
-
-  @Before
-  public void beforeTest() throws Exception {
-    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
-    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
-    kafkaLauncher.startZookeeper();
-    kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-  }
-
-  @After
-  public void afterTest() {
-    kafkaLauncher.stopKafkaServer();
-    kafkaLauncher.stopZookeeper();
-  }
-
-  @Test
-  public void testApplication() throws Exception {
-    try {
-
-      File targetDir = new File(TARGET_DIR);
-      FileUtils.deleteDirectory(targetDir);
-      FileUtils.forceMkdir(targetDir);
-
-      // produce some test data
-      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
-      String[] words = "count the words from kafka and store them in the db".split("\\s+");
-      p.setMessages(Lists.newArrayList(words));
-      new Thread(p).start();
-
-      LocalMode lma = LocalMode.newInstance();
-      Configuration conf = new Configuration(false);
-      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
-      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
-      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
-      conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
-
-      lma.prepareDAG(new AtomicFileOutputApp(), conf);
-      LocalMode.Controller lc = lma.getController();
-      lc.runAsync(); // test will terminate after results are available
-
-      long timeout = System.currentTimeMillis() + 60000; // 60s timeout
-
-      File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
-      while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
-        Thread.sleep(1000);
-        LOG.debug("Waiting for {}", outputFile);
-      }
-
-      Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
-          outputFile.isFile());
-
-      lc.shutdown();
-
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
new file mode 100644
index 0000000..91a3d72
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.datatorrent.api.Attribute;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceFileOutputAppTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+  private static final String TARGET_DIR = "target/atomicFileOutput";
+
+  private final int brokerPort = NetUtils.getFreeSocketPort();
+  private final int zkPort = NetUtils.getFreeSocketPort();
+
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  {
+    // required to avoid 50 partitions auto creation
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    File targetDir = new File(TARGET_DIR);
+    FileUtils.deleteDirectory(targetDir);
+    FileUtils.forceMkdir(targetDir);
+
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    String topicName = "testTopic";
+    // topic creation is async and the producer may also auto-create it
+    ku.createTopic(topicName, 1);
+
+    // produce test data
+    String[] words = "count count the words from kafka and store them in a file".split("\\s+");
+    for (String word : words) {
+      ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+    }
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+    conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+    conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "2"); // consume one word per window
+    conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+    conf.set("apex.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+    AppHandle appHandle = launcher.launchApp(new ExactlyOnceFileOutputApp(), conf, launchAttributes);
+
+    long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+    File outputFile = new File(TARGET_DIR, ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+    while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+      Thread.sleep(1000);
+      LOG.debug("Waiting for {}", outputFile);
+    }
+
+    Assert.assertTrue("output file exists " + ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+        outputFile.isFile());
+
+    String result = FileUtils.readFileToString(outputFile);
+    Assert.assertTrue(result.contains("count=2"));
+
+    appHandle.shutdown(ShutdownMode.KILL);
+  }
+
+}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
new file mode 100644
index 0000000..62bfb74
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceJdbcOutputTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "WORDS";
+
+  private final int brokerPort = NetUtils.getFreeSocketPort();
+  private final int zkPort = NetUtils.getFreeSocketPort();
+
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  {
+    // required to avoid 50 partitions auto creation
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+  }
+
+  @Before
+  public void beforeTest() throws Exception
+  {
+    // setup hsqldb
+    Class.forName(DB_DRIVER).newInstance();
+
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + ")";
+    stmt.executeUpdate(createMetaTable);
+
+    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+    stmt.executeUpdate(createTable);
+
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    String topicName = "testTopic";
+    // topic creation is async and the producer may also auto-create it
+    ku.createTopic(topicName, 1);
+
+    // produce test data
+    String[] words = "count the words from kafka and store them in the db".split("\\s+");
+    for (String word : words) {
+      ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+    }
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+    conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+    conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+    conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+    conf.set("apex.operator.store.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("apex.operator.store.prop.store.databaseUrl", DB_URL);
+
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+    AppHandle appHandle = launcher.launchApp(new ExactlyOnceJdbcOutputApp(), conf, launchAttributes);
+    HashSet<String> wordsSet = Sets.newHashSet(words);
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+    int rowCount = 0;
+    long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+    while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+      Thread.sleep(500);
+      String countQuery = "SELECT count(*) from " + TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      rowCount = resultSet.getInt(1);
+      resultSet.close();
+      LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+    }
+    Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+    appHandle.shutdown(ShutdownMode.KILL);
+  }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
index dd5910b..b9bee5d 100644
--- a/examples/exactly-once/src/test/resources/log4j.properties
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -1,8 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 log4j.rootLogger=DEBUG,CONSOLE
 
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=INFO
 
 log4j.appender.RFA=org.apache.log4j.RollingFileAppender
 log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
@@ -17,7 +38,6 @@ log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m
 log4j.appender.SYSLOG.Facility=LOCAL1
 
 log4j.logger.org=info
-log4j.logger.kafka.server=info
-log4j.logger.kafka.request.logger=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=debug
diff --git a/examples/pom.xml b/examples/pom.xml
index 6c76381..cfd8431 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -201,6 +201,7 @@
     <module>ftp</module>
     <module>s3</module>
     <module>jdbc</module>
+    <module>exactly-once</module>
   </modules>
 
   <dependencies>

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.

[apex-malhar] 01/05: Kafka to JDBC exactly-once example.

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 42bfa560fc9e11f049d6a18725e6f1a009e75483
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Feb 21 20:42:27 2016 -0800

    Kafka to JDBC exactly-once example.
---
 examples/exactly-once/src/assemble/appPackage.xml  |  43 +++++++
 .../java/com/example/myapexapp/Application.java    | 103 +++++++++++++++++
 .../example/myapexapp/RandomNumberGenerator.java   |  47 ++++++++
 .../src/main/resources/META-INF/properties.xml     |  24 ++++
 .../exactly-once/src/site/conf/my-app-conf1.xml    |  11 ++
 .../com/example/myapexapp/ApplicationTest.java     | 124 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  23 ++++
 7 files changed, 375 insertions(+)

diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
new file mode 100644
index 0000000..8050d67
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -0,0 +1,103 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.BaseUniqueKeyCounter;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+    CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
+    store.setStore(new JdbcTransactionalStore());
+    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("words", kafkaInput.outputPort, count.data);
+    dag.addStream("counts", count.count, store.input, cons.input);
+  }
+
+  public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+  {
+    public static final String SQL =
+        "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
+        + " ON (words.word=I.word)"
+        + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount"
+        + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)";
+
+    @Override
+    protected String getUpdateCommand()
+    {
+      return SQL;
+    }
+
+    @Override
+    protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+    {
+      statement.setString(1, tuple.getKey());
+      statement.setInt(2, tuple.getValue());
+    }
+  }
+
+  public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+  {
+    /**
+     * The input port which receives incoming tuples.
+     */
+    public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+    {
+      /**
+       * Reference counts tuples
+       */
+      @Override
+      public void process(K tuple)
+      {
+        processTuple(tuple);
+      }
+
+    };
+
+    public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+    {
+      @Override
+      public Unifier<KeyValPair<K, Integer>> getUnifier()
+      {
+        throw new UnsupportedOperationException("not partitionable");
+      }
+    };
+
+    @Override
+    public void endWindow()
+    {
+      for (Map.Entry<K, MutableInt> e: map.entrySet()) {
+        count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+      }
+      map.clear();
+    }
+
+  }
+
+}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
new file mode 100644
index 0000000..eed344b
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
@@ -0,0 +1,47 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is a simple operator that emits random number.
+ */
+public class RandomNumberGenerator extends BaseOperator implements InputOperator
+{
+  private int numTuples = 100;
+  private transient int count = 0;
+
+  public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    count = 0;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (count++ < numTuples) {
+      out.emit(Math.random());
+    }
+  }
+
+  public int getNumTuples()
+  {
+    return numTuples;
+  }
+
+  /**
+   * Sets the number of tuples to be emitted every window.
+   * @param numTuples number of tuples
+   */
+  public void setNumTuples(int numTuples)
+  {
+    this.numTuples = numTuples;
+  }
+}
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..876c39a
--- /dev/null
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <!-- memory assigned to app master
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  -->
+  <property>
+    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <value>1000</value>
+  </property>
+  <property>
+    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+    <value>hello world: %s</value>
+  </property>
+</configuration>
+
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
new file mode 100644
index 0000000..ccb2b66
--- /dev/null
+++ b/examples/exactly-once/src/site/conf/my-app-conf1.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<configuration>
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  <property>
+    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <value>1000</value>
+  </property>
+</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
new file mode 100644
index 0000000..c5aa69c
--- /dev/null
+++ b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
@@ -0,0 +1,124 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.example.myapexapp.Application;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Test the application in local mode.
+ */
+public class ApplicationTest
+{
+  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String KAFKA_TOPIC = "exactly-once-test";
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "WORDS";
+
+  @Before
+  public void beforeTest() throws Exception {
+    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+    kafkaLauncher.startZookeeper();
+    kafkaLauncher.startKafkaServer();
+    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+
+    // setup hsqldb
+    Class.forName(DB_DRIVER).newInstance();
+
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + ")";
+    stmt.executeUpdate(createMetaTable);
+
+    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+    stmt.executeUpdate(createTable);
+
+  }
+
+  @After
+  public void afterTest() {
+    kafkaLauncher.stopKafkaServer();
+    kafkaLauncher.stopZookeeper();
+  }
+
+  @Test
+  public void testApplication() throws Exception {
+    try {
+      // produce some test data
+      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+      String[] words = "count the words from kafka and store them in the db".split("\\s+");
+      p.setMessages(Lists.newArrayList(words));
+      new Thread(p).start();
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+      conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
+      conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
+
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync(); // test will terminate after results are available
+
+      HashSet<String> wordsSet = Sets.newHashSet(words);
+      Connection con = DriverManager.getConnection(DB_URL);
+      Statement stmt = con.createStatement();
+      int rowCount = 0;
+      long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+      while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+        Thread.sleep(1000);
+        String countQuery = "SELECT count(*) from " + TABLE_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        rowCount = resultSet.getInt(1);
+        resultSet.close();
+        LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+      }
+      Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+
+      lc.shutdown();
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd5910b
--- /dev/null
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+log4j.logger.kafka.server=info
+log4j.logger.kafka.request.logger=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.

[apex-malhar] 05/05: Expand README and improve formatting.

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 658cafc31afe2cf2f3dfec5511844c96a2a7fa1a
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Aug 6 08:59:37 2017 -0700

    Expand README and improve formatting.
---
 examples/exactly-once/README.md                    | 25 ++++++++++++++++++++--
 .../exactlyonce/ExactlyOnceJdbcOutputApp.java      | 13 ++++++++---
 .../exactlyonce/ExactlyOnceJdbcOutputTest.java     |  7 +++---
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
index 5254b4c..4c3f10d 100644
--- a/examples/exactly-once/README.md
+++ b/examples/exactly-once/README.md
@@ -1,8 +1,27 @@
 # Examples for end-to-end exactly-once
 
+The examples are a variation of word count to illustrate end-to-end exactly-once processing
+by incorporating the external system integration aspect, which needs to be taken into account when
+developing real-world pipelines:
+
+* Read from Kafka source
+* Windowed count aggregation that emits incremental aggregates
+* Sink that maintains totals accumulating the incremental aggregates (shown for JDBC and file output)
+
+The examples combine the 3 properties that are required for end-to-end exactly-once results:
+
+1. At-least-once processing that guarantees no loss of data
+2. Idempotency in the DAG (Kafka input operator and repeatable/deterministic streaming windows)
+3. Consistent state between DAG and external system, enabled by the output operators.
+
+The test cases show how the applications can be configured to run in embedded mode (including Kafka).
+
 ## Read from Kafka, write to JDBC
 
-This application shows exactly-once output to JDBC through transactions:
+Shows exactly-once output to JDBC through transactions. The JDBC output operator
+keeps track of the streaming window along with the count to avoid duplicate writes on replay
+during recovery. This is an example for continuously updating results in the database,
+enabled by the transactions.
 
 [Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
 
@@ -10,7 +29,9 @@ This application shows exactly-once output to JDBC through transactions:
 
 ## Read from Kafka, write to Files
 
-This application shows exactly-once output to HDFS through atomic file operation:
+This application shows exactly-once output to files through atomic file operation. In contrast to the
+JDBC example, output can only occur once the final count is computed. This implies batching at the sink,
+leading to high latency.
 
 [Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
 
diff --git a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 33ae9dc..6982833 100644
--- a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -48,7 +48,8 @@ public class ExactlyOnceJdbcOutputApp implements StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
+        new KafkaSinglePortStringInputOperator());
     kafkaInput.setWindowDataManager(new FSWindowDataManager());
     UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
@@ -58,7 +59,8 @@ public class ExactlyOnceJdbcOutputApp implements StreamingApplication
     dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
-  public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+  public static class CountStoreOperator
+      extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
   {
     public static final String SQL =
         "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
@@ -73,13 +75,18 @@ public class ExactlyOnceJdbcOutputApp implements StreamingApplication
     }
 
     @Override
-    protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+    protected void setStatementParameters(PreparedStatement statement,
+        KeyValPair<String, Integer> tuple) throws SQLException
     {
       statement.setString(1, tuple.getKey());
       statement.setInt(2, tuple.getValue());
     }
   }
 
+  /**
+   * Extension of {@link UniqueCounter} that emits individual key/value pairs instead
+   * of map with all modified values.
+   */
   public static class UniqueCounterFlat extends UniqueCounter<String>
   {
     public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
index 62bfb74..5457ec5 100644
--- a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -76,19 +76,20 @@ public class ExactlyOnceJdbcOutputTest
     Connection con = DriverManager.getConnection(DB_URL);
     Statement stmt = con.createStatement();
 
-    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS "
+        + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
         + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
         + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
         + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
         + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
         + ")";
     stmt.executeUpdate(createMetaTable);
 
     String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
         + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
     stmt.executeUpdate(createTable);
-
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.