You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:13 UTC

[apex-malhar] 04/05: APEXMALHAR-2233 Migrate exactly-once examples.

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 9ad844720f2b741bd54cfdda9d5f3a418921e6b1
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Jul 30 23:51:40 2017 -0700

    APEXMALHAR-2233 Migrate exactly-once examples.
---
 examples/exactly-once/README.md                    |  17 +++
 examples/exactly-once/pom.xml                      |  62 +++++++++
 examples/exactly-once/src/assemble/appPackage.xml  |  20 +++
 .../example/myapexapp/RandomNumberGenerator.java   |  47 -------
 .../exactlyonce/ExactlyOnceFileOutputApp.java}     |  33 +++--
 .../exactlyonce/ExactlyOnceJdbcOutputApp.java}     |  56 +++++++--
 .../src/main/resources/META-INF/properties.xml     |  52 ++++++--
 .../exactly-once/src/site/conf/my-app-conf1.xml    |  11 --
 .../com/example/myapexapp/ApplicationTest.java     | 124 ------------------
 .../example/myapexapp/AtomicFileOutputAppTest.java |  93 --------------
 .../exactlyonce/ExactlyOnceFileOutputAppTest.java  | 109 ++++++++++++++++
 .../exactlyonce/ExactlyOnceJdbcOutputTest.java     | 139 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  24 +++-
 examples/pom.xml                                   |   1 +
 14 files changed, 479 insertions(+), 309 deletions(-)

diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
new file mode 100644
index 0000000..5254b4c
--- /dev/null
+++ b/examples/exactly-once/README.md
@@ -0,0 +1,17 @@
+# Examples for end-to-end exactly-once
+
+## Read from Kafka, write to JDBC
+
+This application shows exactly-once output to JDBC through transactions:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java)
+
+## Read from Kafka, write to Files
+
+This application shows exactly-once output to HDFS through atomic file operation:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java)
diff --git a/examples/exactly-once/pom.xml b/examples/exactly-once/pom.xml
new file mode 100644
index 0000000..c198f48
--- /dev/null
+++ b/examples/exactly-once/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-examples-exactly-once</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Exactly-Once Examples</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>info.batey.kafka</groupId>
+      <artifactId>kafka-unit</artifactId>
+      <version>0.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.4</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
index 7ad071c..a870807 100644
--- a/examples/exactly-once/src/assemble/appPackage.xml
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -1,3 +1,23 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
deleted file mode 100644
index eed344b..0000000
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * This is a simple operator that emits random number.
- */
-public class RandomNumberGenerator extends BaseOperator implements InputOperator
-{
-  private int numTuples = 100;
-  private transient int count = 0;
-
-  public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    count = 0;
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    if (count++ < numTuples) {
-      out.emit(Math.random());
-    }
-  }
-
-  public int getNumTuples()
-  {
-    return numTuples;
-  }
-
-  /**
-   * Sets the number of tuples to be emitted every window.
-   * @param numTuples number of tuples
-   */
-  public void setNumTuples(int numTuples)
-  {
-    this.numTuples = numTuples;
-  }
-}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
similarity index 67%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
index 55f8cc6..9aa2870 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
@@ -1,32 +1,47 @@
 /**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
 
+import org.apache.apex.examples.exactlyonce.ExactlyOnceJdbcOutputApp.KafkaSinglePortStringInputOperator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
-@ApplicationAnnotation(name = "AtomicFileOutput")
-public class AtomicFileOutputApp implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceFileOutput")
+public class ExactlyOnceFileOutputApp implements StreamingApplication
 {
   @Override
   public void populateDAG(DAG dag, Configuration configuration)
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
         new KafkaSinglePortStringInputOperator());
-    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    kafkaInput.setWindowDataManager(new FSWindowDataManager());
 
-    Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+    ExactlyOnceJdbcOutputApp.UniqueCounterFlat count = dag.addOperator("count",
+        new ExactlyOnceJdbcOutputApp.UniqueCounterFlat());
 
     FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
 
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
similarity index 57%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 7700d68..33ae9dc 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -1,38 +1,55 @@
 /**
- * Put your copyright and license info here.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Map;
+import java.util.Properties;
 
+import org.apache.apex.malhar.kafka.AbstractKafkaConsumer;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaConsumer09;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.lib.util.BaseUniqueKeyCounter;
 import com.datatorrent.lib.util.KeyValPair;
 
-@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
-public class Application implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceJbdcOutput")
+public class ExactlyOnceJdbcOutputApp implements StreamingApplication
 {
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
-    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    kafkaInput.setWindowDataManager(new FSWindowDataManager());
     UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
     store.setStore(new JdbcTransactionalStore());
@@ -77,4 +94,21 @@ public class Application implements StreamingApplication
     }
   }
 
+  public static class KafkaSinglePortStringInputOperator extends AbstractKafkaInputOperator
+  {
+    public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
+
+    @Override
+    public AbstractKafkaConsumer createConsumer(Properties properties)
+    {
+      return new KafkaConsumer09(properties);
+    }
+
+    @Override
+    protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+    {
+      outputPort.emit(new String(message.value()));
+    }
+  }
+
 }
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
index 876c39a..70f8812 100644
--- a/examples/exactly-once/src/main/resources/META-INF/properties.xml
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -1,24 +1,52 @@
 <?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <configuration>
-  <!-- 
+
   <property>
-    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
-    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+    <name>apex.operator.kafkaInput.prop.initialPartitionCount</name>
+    <value>1</value>
   </property>
-  -->
-  <!-- memory assigned to app master
   <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
+    <name>apex.operator.kafkaInput.prop.topics</name>
+    <value>exactly-once-example</value>
   </property>
-  -->
   <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
-    <value>1000</value>
+    <name>apex.operator.kafkaInput.prop.clusters</name>
+    <value>localhost:9092</value>
   </property>
+  
   <property>
-    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
-    <value>hello world: %s</value>
+    <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
   </property>
+  <property>
+    <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value>
+  </property>
+
+  <property>
+    <name>apex.application.ExactlyOnceFileOutput.operator.fileWriter.prop.filePath</name>
+    <value>target/atomicFileOutput</value>
+  </property>
+
 </configuration>
 
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index ccb2b66..0000000
--- a/examples/exactly-once/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-  <property>
-    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
-    <value>1000</value>
-  </property>
-</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index c5aa69c..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.HashSet;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.example.myapexapp.Application;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Test the application in local mode.
- */
-public class ApplicationTest
-{
-  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String KAFKA_TOPIC = "exactly-once-test";
-  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
-  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
-  private static final String TABLE_NAME = "WORDS";
-
-  @Before
-  public void beforeTest() throws Exception {
-    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
-    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
-    kafkaLauncher.startZookeeper();
-    kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-
-    // setup hsqldb
-    Class.forName(DB_DRIVER).newInstance();
-
-    Connection con = DriverManager.getConnection(DB_URL);
-    Statement stmt = con.createStatement();
-
-    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
-        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
-        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
-        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
-        + ")";
-    stmt.executeUpdate(createMetaTable);
-
-    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
-        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
-    stmt.executeUpdate(createTable);
-
-  }
-
-  @After
-  public void afterTest() {
-    kafkaLauncher.stopKafkaServer();
-    kafkaLauncher.stopZookeeper();
-  }
-
-  @Test
-  public void testApplication() throws Exception {
-    try {
-      // produce some test data
-      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
-      String[] words = "count the words from kafka and store them in the db".split("\\s+");
-      p.setMessages(Lists.newArrayList(words));
-      new Thread(p).start();
-
-      LocalMode lma = LocalMode.newInstance();
-      Configuration conf = new Configuration(false);
-      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
-      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
-      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
-      conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
-      conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
-
-      lma.prepareDAG(new Application(), conf);
-      LocalMode.Controller lc = lma.getController();
-      lc.runAsync(); // test will terminate after results are available
-
-      HashSet<String> wordsSet = Sets.newHashSet(words);
-      Connection con = DriverManager.getConnection(DB_URL);
-      Statement stmt = con.createStatement();
-      int rowCount = 0;
-      long timeout = System.currentTimeMillis() + 30000; // 30s timeout
-      while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
-        Thread.sleep(1000);
-        String countQuery = "SELECT count(*) from " + TABLE_NAME;
-        ResultSet resultSet = stmt.executeQuery(countQuery);
-        resultSet.next();
-        rowCount = resultSet.getInt(1);
-        resultSet.close();
-        LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
-      }
-      Assert.assertEquals("number of words", wordsSet.size(), rowCount);
-
-      lc.shutdown();
-
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
deleted file mode 100644
index b539394..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-public class AtomicFileOutputAppTest
-{
-  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  private static final String KAFKA_TOPIC = "exactly-once-test";
-  private static final String TARGET_DIR = "target/atomicFileOutput";
-
-  @Before
-  public void beforeTest() throws Exception {
-    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
-    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
-    kafkaLauncher.startZookeeper();
-    kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-  }
-
-  @After
-  public void afterTest() {
-    kafkaLauncher.stopKafkaServer();
-    kafkaLauncher.stopZookeeper();
-  }
-
-  @Test
-  public void testApplication() throws Exception {
-    try {
-
-      File targetDir = new File(TARGET_DIR);
-      FileUtils.deleteDirectory(targetDir);
-      FileUtils.forceMkdir(targetDir);
-
-      // produce some test data
-      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
-      String[] words = "count the words from kafka and store them in the db".split("\\s+");
-      p.setMessages(Lists.newArrayList(words));
-      new Thread(p).start();
-
-      LocalMode lma = LocalMode.newInstance();
-      Configuration conf = new Configuration(false);
-      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
-      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
-      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
-      conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
-
-      lma.prepareDAG(new AtomicFileOutputApp(), conf);
-      LocalMode.Controller lc = lma.getController();
-      lc.runAsync(); // test will terminate after results are available
-
-      long timeout = System.currentTimeMillis() + 60000; // 60s timeout
-
-      File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
-      while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
-        Thread.sleep(1000);
-        LOG.debug("Waiting for {}", outputFile);
-      }
-
-      Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
-          outputFile.isFile());
-
-      lc.shutdown();
-
-    } catch (ConstraintViolationException e) {
-      Assert.fail("constraint violations: " + e.getConstraintViolations());
-    }
-  }
-
-}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
new file mode 100644
index 0000000..91a3d72
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.datatorrent.api.Attribute;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceFileOutputAppTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+  private static final String TARGET_DIR = "target/atomicFileOutput";
+
+  private final int brokerPort = NetUtils.getFreeSocketPort();
+  private final int zkPort = NetUtils.getFreeSocketPort();
+
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  {
+    // required to avoid 50 partitions auto creation
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    File targetDir = new File(TARGET_DIR);
+    FileUtils.deleteDirectory(targetDir);
+    FileUtils.forceMkdir(targetDir);
+
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    String topicName = "testTopic";
+    // topic creation is async and the producer may also auto-create it
+    ku.createTopic(topicName, 1);
+
+    // produce test data
+    String[] words = "count count the words from kafka and store them in a file".split("\\s+");
+    for (String word : words) {
+      ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+    }
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+    conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+    conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "2"); // consume one word per window
+    conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+    conf.set("apex.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+    AppHandle appHandle = launcher.launchApp(new ExactlyOnceFileOutputApp(), conf, launchAttributes);
+
+    long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+    File outputFile = new File(TARGET_DIR, ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+    while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+      Thread.sleep(1000);
+      LOG.debug("Waiting for {}", outputFile);
+    }
+
+    Assert.assertTrue("output file exists " + ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+        outputFile.isFile());
+
+    String result = FileUtils.readFileToString(outputFile);
+    Assert.assertTrue(result.contains("count=2"));
+
+    appHandle.shutdown(ShutdownMode.KILL);
+  }
+
+}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
new file mode 100644
index 0000000..62bfb74
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceJdbcOutputTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "WORDS";
+
+  private final int brokerPort = NetUtils.getFreeSocketPort();
+  private final int zkPort = NetUtils.getFreeSocketPort();
+
+  @Rule
+  public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+  {
+    // required to avoid 50 partitions auto creation
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+    this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+  }
+
+  @Before
+  public void beforeTest() throws Exception
+  {
+    // setup hsqldb
+    Class.forName(DB_DRIVER).newInstance();
+
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + ")";
+    stmt.executeUpdate(createMetaTable);
+
+    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+    stmt.executeUpdate(createTable);
+
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+    String topicName = "testTopic";
+    // topic creation is async and the producer may also auto-create it
+    ku.createTopic(topicName, 1);
+
+    // produce test data
+    String[] words = "count the words from kafka and store them in the db".split("\\s+");
+    for (String word : words) {
+      ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+    }
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+    conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+    conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+    conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+    conf.set("apex.operator.store.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("apex.operator.store.prop.store.databaseUrl", DB_URL);
+
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+    AppHandle appHandle = launcher.launchApp(new ExactlyOnceJdbcOutputApp(), conf, launchAttributes);
+    HashSet<String> wordsSet = Sets.newHashSet(words);
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+    int rowCount = 0;
+    long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+    while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+      Thread.sleep(500);
+      String countQuery = "SELECT count(*) from " + TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      rowCount = resultSet.getInt(1);
+      resultSet.close();
+      LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+    }
+    Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+    appHandle.shutdown(ShutdownMode.KILL);
+  }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
index dd5910b..b9bee5d 100644
--- a/examples/exactly-once/src/test/resources/log4j.properties
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -1,8 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 log4j.rootLogger=DEBUG,CONSOLE
 
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=INFO
 
 log4j.appender.RFA=org.apache.log4j.RollingFileAppender
 log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
@@ -17,7 +38,6 @@ log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m
 log4j.appender.SYSLOG.Facility=LOCAL1
 
 log4j.logger.org=info
-log4j.logger.kafka.server=info
-log4j.logger.kafka.request.logger=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=debug
diff --git a/examples/pom.xml b/examples/pom.xml
index 6c76381..cfd8431 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -201,6 +201,7 @@
     <module>ftp</module>
     <module>s3</module>
     <module>jdbc</module>
+    <module>exactly-once</module>
   </modules>
 
   <dependencies>

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.