You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:10 UTC
[apex-malhar] 01/05: Kafka to JDBC exactly-once example.
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
commit 42bfa560fc9e11f049d6a18725e6f1a009e75483
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Feb 21 20:42:27 2016 -0800
Kafka to JDBC exactly-once example.
---
examples/exactly-once/src/assemble/appPackage.xml | 43 +++++++
.../java/com/example/myapexapp/Application.java | 103 +++++++++++++++++
.../example/myapexapp/RandomNumberGenerator.java | 47 ++++++++
.../src/main/resources/META-INF/properties.xml | 24 ++++
.../exactly-once/src/site/conf/my-app-conf1.xml | 11 ++
.../com/example/myapexapp/ApplicationTest.java | 124 +++++++++++++++++++++
.../src/test/resources/log4j.properties | 23 ++++
7 files changed, 375 insertions(+)
diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/resources</directory>
+ <outputDirectory>/resources</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
new file mode 100644
index 0000000..8050d67
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -0,0 +1,103 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.BaseUniqueKeyCounter;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+ kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+ CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
+ store.setStore(new JdbcTransactionalStore());
+ ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("words", kafkaInput.outputPort, count.data);
+ dag.addStream("counts", count.count, store.input, cons.input);
+ }
+
+ public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+ {
+ public static final String SQL =
+ "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
+ + " ON (words.word=I.word)"
+ + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount"
+ + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)";
+
+ @Override
+ protected String getUpdateCommand()
+ {
+ return SQL;
+ }
+
+ @Override
+ protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+ {
+ statement.setString(1, tuple.getKey());
+ statement.setInt(2, tuple.getValue());
+ }
+ }
+
+ public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+ {
+ /**
+ * The input port which receives incoming tuples.
+ */
+ public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+ {
+ /**
+ * Reference counts tuples
+ */
+ @Override
+ public void process(K tuple)
+ {
+ processTuple(tuple);
+ }
+
+ };
+
+ public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+ {
+ @Override
+ public Unifier<KeyValPair<K, Integer>> getUnifier()
+ {
+ throw new UnsupportedOperationException("not partitionable");
+ }
+ };
+
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<K, MutableInt> e: map.entrySet()) {
+ count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+ }
+ map.clear();
+ }
+
+ }
+
+}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
new file mode 100644
index 0000000..eed344b
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
@@ -0,0 +1,47 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is a simple operator that emits random number.
+ */
+public class RandomNumberGenerator extends BaseOperator implements InputOperator
+{
+ private int numTuples = 100;
+ private transient int count = 0;
+
+ public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ count = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (count++ < numTuples) {
+ out.emit(Math.random());
+ }
+ }
+
+ public int getNumTuples()
+ {
+ return numTuples;
+ }
+
+ /**
+ * Sets the number of tuples to be emitted every window.
+ * @param numTuples number of tuples
+ */
+ public void setNumTuples(int numTuples)
+ {
+ this.numTuples = numTuples;
+ }
+}
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..876c39a
--- /dev/null
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<configuration>
+ <!--
+ <property>
+ <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+ </property>
+ -->
+ <!-- memory assigned to app master
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+ -->
+ <property>
+ <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+ <value>hello world: %s</value>
+ </property>
+</configuration>
+
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
new file mode 100644
index 0000000..ccb2b66
--- /dev/null
+++ b/examples/exactly-once/src/site/conf/my-app-conf1.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<configuration>
+ <property>
+ <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value>
+ </property>
+ <property>
+ <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+ <value>1000</value>
+ </property>
+</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
new file mode 100644
index 0000000..c5aa69c
--- /dev/null
+++ b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
@@ -0,0 +1,124 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.example.myapexapp.Application;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Test the application in local mode.
+ */
+public class ApplicationTest
+{
+ private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+ private static final String KAFKA_TOPIC = "exactly-once-test";
+ private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+ private static final String TABLE_NAME = "WORDS";
+
+ @Before
+ public void beforeTest() throws Exception {
+ kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+ FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+ kafkaLauncher.startZookeeper();
+ kafkaLauncher.startKafkaServer();
+ kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+
+ // setup hsqldb
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+ stmt.executeUpdate(createTable);
+
+ }
+
+ @After
+ public void afterTest() {
+ kafkaLauncher.stopKafkaServer();
+ kafkaLauncher.stopZookeeper();
+ }
+
+ @Test
+ public void testApplication() throws Exception {
+ try {
+ // produce some test data
+ KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+ String[] words = "count the words from kafka and store them in the db".split("\\s+");
+ p.setMessages(Lists.newArrayList(words));
+ new Thread(p).start();
+
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+ conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+ conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+ conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
+
+ lma.prepareDAG(new Application(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync(); // test will terminate after results are available
+
+ HashSet<String> wordsSet = Sets.newHashSet(words);
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+ int rowCount = 0;
+ long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+ while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(1000);
+ String countQuery = "SELECT count(*) from " + TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ rowCount = resultSet.getInt(1);
+ resultSet.close();
+ LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+ }
+ Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+
+ lc.shutdown();
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd5910b
--- /dev/null
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+log4j.logger.kafka.server=info
+log4j.logger.kafka.request.logger=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info
--
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.