You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:13 UTC
[apex-malhar] 04/05: APEXMALHAR-2233 Migrate exactly-once examples.
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
commit 9ad844720f2b741bd54cfdda9d5f3a418921e6b1
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Jul 30 23:51:40 2017 -0700
APEXMALHAR-2233 Migrate exactly-once examples.
---
examples/exactly-once/README.md | 17 +++
examples/exactly-once/pom.xml | 62 +++++++++
examples/exactly-once/src/assemble/appPackage.xml | 20 +++
.../example/myapexapp/RandomNumberGenerator.java | 47 -------
.../exactlyonce/ExactlyOnceFileOutputApp.java} | 33 +++--
.../exactlyonce/ExactlyOnceJdbcOutputApp.java} | 56 +++++++--
.../src/main/resources/META-INF/properties.xml | 52 ++++++--
.../exactly-once/src/site/conf/my-app-conf1.xml | 11 --
.../com/example/myapexapp/ApplicationTest.java | 124 ------------------
.../example/myapexapp/AtomicFileOutputAppTest.java | 93 --------------
.../exactlyonce/ExactlyOnceFileOutputAppTest.java | 109 ++++++++++++++++
.../exactlyonce/ExactlyOnceJdbcOutputTest.java | 139 +++++++++++++++++++++
.../src/test/resources/log4j.properties | 24 +++-
examples/pom.xml | 1 +
14 files changed, 479 insertions(+), 309 deletions(-)
diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
new file mode 100644
index 0000000..5254b4c
--- /dev/null
+++ b/examples/exactly-once/README.md
@@ -0,0 +1,17 @@
+# Examples for end-to-end exactly-once
+
+## Read from Kafka, write to JDBC
+
+This application shows exactly-once output to JDBC through transactions:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java)
+
+## Read from Kafka, write to Files
+
+This application shows exactly-once output to HDFS through atomic file operation:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java)
diff --git a/examples/exactly-once/pom.xml b/examples/exactly-once/pom.xml
new file mode 100644
index 0000000..c198f48
--- /dev/null
+++ b/examples/exactly-once/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-exactly-once</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Exactly-Once Examples</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.4</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
index 7ad071c..a870807 100644
--- a/examples/exactly-once/src/assemble/appPackage.xml
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -1,3 +1,23 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
deleted file mode 100644
index eed344b..0000000
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * This is a simple operator that emits random number.
- */
-public class RandomNumberGenerator extends BaseOperator implements InputOperator
-{
- private int numTuples = 100;
- private transient int count = 0;
-
- public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
-
- @Override
- public void beginWindow(long windowId)
- {
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if (count++ < numTuples) {
- out.emit(Math.random());
- }
- }
-
- public int getNumTuples()
- {
- return numTuples;
- }
-
- /**
- * Sets the number of tuples to be emitted every window.
- * @param numTuples number of tuples
- */
- public void setNumTuples(int numTuples)
- {
- this.numTuples = numTuples;
- }
-}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
similarity index 67%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
index 55f8cc6..9aa2870 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
@@ -1,32 +1,47 @@
/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
+import org.apache.apex.examples.exactlyonce.ExactlyOnceJdbcOutputApp.KafkaSinglePortStringInputOperator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.lib.util.KeyValPair;
-@ApplicationAnnotation(name = "AtomicFileOutput")
-public class AtomicFileOutputApp implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceFileOutput")
+public class ExactlyOnceFileOutputApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
new KafkaSinglePortStringInputOperator());
- kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ kafkaInput.setWindowDataManager(new FSWindowDataManager());
- Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+ ExactlyOnceJdbcOutputApp.UniqueCounterFlat count = dag.addOperator("count",
+ new ExactlyOnceJdbcOutputApp.UniqueCounterFlat());
FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
similarity index 57%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 7700d68..33ae9dc 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -1,38 +1,55 @@
/**
- * Put your copyright and license info here.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
+import java.util.Properties;
+import org.apache.apex.malhar.kafka.AbstractKafkaConsumer;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaConsumer09;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.lib.util.BaseUniqueKeyCounter;
import com.datatorrent.lib.util.KeyValPair;
-@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
-public class Application implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceJbdcOutput")
+public class ExactlyOnceJdbcOutputApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
- kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ kafkaInput.setWindowDataManager(new FSWindowDataManager());
UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
store.setStore(new JdbcTransactionalStore());
@@ -77,4 +94,21 @@ public class Application implements StreamingApplication
}
}
+ public static class KafkaSinglePortStringInputOperator extends AbstractKafkaInputOperator
+ {
+ public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
+
+ @Override
+ public AbstractKafkaConsumer createConsumer(Properties properties)
+ {
+ return new KafkaConsumer09(properties);
+ }
+
+ @Override
+ protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+ {
+ outputPort.emit(new String(message.value()));
+ }
+ }
+
}
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
index 876c39a..70f8812 100644
--- a/examples/exactly-once/src/main/resources/META-INF/properties.xml
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -1,24 +1,52 @@
<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<configuration>
- <!--
+
<property>
- <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
- <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+ <name>apex.operator.kafkaInput.prop.initialPartitionCount</name>
+ <value>1</value>
</property>
- -->
- <!-- memory assigned to app master
<property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
+ <name>apex.operator.kafkaInput.prop.topics</name>
+ <value>exactly-once-example</value>
</property>
- -->
<property>
- <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
- <value>1000</value>
+ <name>apex.operator.kafkaInput.prop.clusters</name>
+ <value>localhost:9092</value>
</property>
+
<property>
- <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
- <value>hello world: %s</value>
+ <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
</property>
+ <property>
+ <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value>
+ </property>
+
+ <property>
+ <name>apex.application.ExactlyOnceFileOutput.operator.fileWriter.prop.filePath</name>
+ <value>target/atomicFileOutput</value>
+ </property>
+
</configuration>
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index ccb2b66..0000000
--- a/examples/exactly-once/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
- <property>
- <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
- <value>1000</value>
- </property>
-</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index c5aa69c..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.HashSet;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.example.myapexapp.Application;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Test the application in local mode.
- */
-public class ApplicationTest
-{
- private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String KAFKA_TOPIC = "exactly-once-test";
- private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
- private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
- private static final String TABLE_NAME = "WORDS";
-
- @Before
- public void beforeTest() throws Exception {
- kafkaLauncher.baseDir = "target/" + this.getClass().getName();
- FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
- kafkaLauncher.startZookeeper();
- kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-
- // setup hsqldb
- Class.forName(DB_DRIVER).newInstance();
-
- Connection con = DriverManager.getConnection(DB_URL);
- Statement stmt = con.createStatement();
-
- String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
- + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
- + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
- + ")";
- stmt.executeUpdate(createMetaTable);
-
- String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
- + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
- stmt.executeUpdate(createTable);
-
- }
-
- @After
- public void afterTest() {
- kafkaLauncher.stopKafkaServer();
- kafkaLauncher.stopZookeeper();
- }
-
- @Test
- public void testApplication() throws Exception {
- try {
- // produce some test data
- KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
- String[] words = "count the words from kafka and store them in the db".split("\\s+");
- p.setMessages(Lists.newArrayList(words));
- new Thread(p).start();
-
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
- conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
- conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
- conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
- conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
-
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync(); // test will terminate after results are available
-
- HashSet<String> wordsSet = Sets.newHashSet(words);
- Connection con = DriverManager.getConnection(DB_URL);
- Statement stmt = con.createStatement();
- int rowCount = 0;
- long timeout = System.currentTimeMillis() + 30000; // 30s timeout
- while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
- Thread.sleep(1000);
- String countQuery = "SELECT count(*) from " + TABLE_NAME;
- ResultSet resultSet = stmt.executeQuery(countQuery);
- resultSet.next();
- rowCount = resultSet.getInt(1);
- resultSet.close();
- LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
- }
- Assert.assertEquals("number of words", wordsSet.size(), rowCount);
-
- lc.shutdown();
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
deleted file mode 100644
index b539394..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-public class AtomicFileOutputAppTest
-{
- private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String KAFKA_TOPIC = "exactly-once-test";
- private static final String TARGET_DIR = "target/atomicFileOutput";
-
- @Before
- public void beforeTest() throws Exception {
- kafkaLauncher.baseDir = "target/" + this.getClass().getName();
- FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
- kafkaLauncher.startZookeeper();
- kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(0, KAFKA_TOPIC);
- }
-
- @After
- public void afterTest() {
- kafkaLauncher.stopKafkaServer();
- kafkaLauncher.stopZookeeper();
- }
-
- @Test
- public void testApplication() throws Exception {
- try {
-
- File targetDir = new File(TARGET_DIR);
- FileUtils.deleteDirectory(targetDir);
- FileUtils.forceMkdir(targetDir);
-
- // produce some test data
- KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
- String[] words = "count the words from kafka and store them in the db".split("\\s+");
- p.setMessages(Lists.newArrayList(words));
- new Thread(p).start();
-
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
- conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
- conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
- conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
-
- lma.prepareDAG(new AtomicFileOutputApp(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync(); // test will terminate after results are available
-
- long timeout = System.currentTimeMillis() + 60000; // 60s timeout
-
- File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
- while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
- Thread.sleep(1000);
- LOG.debug("Waiting for {}", outputFile);
- }
-
- Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
- outputFile.isFile());
-
- lc.shutdown();
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
new file mode 100644
index 0000000..91a3d72
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.datatorrent.api.Attribute;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceFileOutputAppTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+ private static final String TARGET_DIR = "target/atomicFileOutput";
+
+ private final int brokerPort = NetUtils.getFreeSocketPort();
+ private final int zkPort = NetUtils.getFreeSocketPort();
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ {
+ // required to avoid 50 partitions auto creation
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ File targetDir = new File(TARGET_DIR);
+ FileUtils.deleteDirectory(targetDir);
+ FileUtils.forceMkdir(targetDir);
+
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ String topicName = "testTopic";
+ // topic creation is async and the producer may also auto-create it
+ ku.createTopic(topicName, 1);
+
+ // produce test data
+ String[] words = "count count the words from kafka and store them in a file".split("\\s+");
+ for (String word : words) {
+ ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+ }
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+ conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+ conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "2"); // consume one word per window
+ conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+ conf.set("apex.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+ EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+ AppHandle appHandle = launcher.launchApp(new ExactlyOnceFileOutputApp(), conf, launchAttributes);
+
+ long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+ File outputFile = new File(TARGET_DIR, ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+ while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(1000);
+ LOG.debug("Waiting for {}", outputFile);
+ }
+
+ Assert.assertTrue("output file exists " + ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+ outputFile.isFile());
+
+ String result = FileUtils.readFileToString(outputFile);
+ Assert.assertTrue(result.contains("count=2"));
+
+ appHandle.shutdown(ShutdownMode.KILL);
+ }
+
+}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
new file mode 100644
index 0000000..62bfb74
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceJdbcOutputTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+ private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+ private static final String TABLE_NAME = "WORDS";
+
+ private final int brokerPort = NetUtils.getFreeSocketPort();
+ private final int zkPort = NetUtils.getFreeSocketPort();
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ {
+ // required to avoid 50 partitions auto creation
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+ }
+
+ @Before
+ public void beforeTest() throws Exception
+ {
+ // setup hsqldb
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+ stmt.executeUpdate(createTable);
+
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ String topicName = "testTopic";
+ // topic creation is async and the producer may also auto-create it
+ ku.createTopic(topicName, 1);
+
+ // produce test data
+ String[] words = "count the words from kafka and store them in the db".split("\\s+");
+ for (String word : words) {
+ ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+ }
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+ conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+ conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+ conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+ conf.set("apex.operator.store.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("apex.operator.store.prop.store.databaseUrl", DB_URL);
+
+ EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+ AppHandle appHandle = launcher.launchApp(new ExactlyOnceJdbcOutputApp(), conf, launchAttributes);
+ HashSet<String> wordsSet = Sets.newHashSet(words);
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+ int rowCount = 0;
+ long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+ while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(500);
+ String countQuery = "SELECT count(*) from " + TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ rowCount = resultSet.getInt(1);
+ resultSet.close();
+ LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+ }
+ Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+ appHandle.shutdown(ShutdownMode.KILL);
+ }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
index dd5910b..b9bee5d 100644
--- a/examples/exactly-once/src/test/resources/log4j.properties
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -1,8 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
log4j.rootLogger=DEBUG,CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
@@ -17,7 +38,6 @@ log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m
log4j.appender.SYSLOG.Facility=LOCAL1
log4j.logger.org=info
-log4j.logger.kafka.server=info
-log4j.logger.kafka.request.logger=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=debug
diff --git a/examples/pom.xml b/examples/pom.xml
index 6c76381..cfd8431 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -201,6 +201,7 @@
<module>ftp</module>
<module>s3</module>
<module>jdbc</module>
+ <module>exactly-once</module>
</modules>
<dependencies>
--
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.