You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/08 11:07:10 UTC

[apex-malhar] 01/05: Kafka to JDBC exactly-once example.

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 42bfa560fc9e11f049d6a18725e6f1a009e75483
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Feb 21 20:42:27 2016 -0800

    Kafka to JDBC exactly-once example.
---
 examples/exactly-once/src/assemble/appPackage.xml  |  43 +++++++
 .../java/com/example/myapexapp/Application.java    | 103 +++++++++++++++++
 .../example/myapexapp/RandomNumberGenerator.java   |  47 ++++++++
 .../src/main/resources/META-INF/properties.xml     |  24 ++++
 .../exactly-once/src/site/conf/my-app-conf1.xml    |  11 ++
 .../com/example/myapexapp/ApplicationTest.java     | 124 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  23 ++++
 7 files changed, 375 insertions(+)

diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
new file mode 100644
index 0000000..8050d67
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -0,0 +1,103 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.BaseUniqueKeyCounter;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+    kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+    CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
+    store.setStore(new JdbcTransactionalStore());
+    ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+    dag.addStream("words", kafkaInput.outputPort, count.data);
+    dag.addStream("counts", count.count, store.input, cons.input);
+  }
+
+  public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+  {
+    public static final String SQL =
+        "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
+        + " ON (words.word=I.word)"
+        + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount"
+        + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)";
+
+    @Override
+    protected String getUpdateCommand()
+    {
+      return SQL;
+    }
+
+    @Override
+    protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+    {
+      statement.setString(1, tuple.getKey());
+      statement.setInt(2, tuple.getValue());
+    }
+  }
+
+  public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+  {
+    /**
+     * The input port which receives incoming tuples.
+     */
+    public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+    {
+      /**
+       * Reference counts tuples
+       */
+      @Override
+      public void process(K tuple)
+      {
+        processTuple(tuple);
+      }
+
+    };
+
+    public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
+    {
+      @Override
+      public Unifier<KeyValPair<K, Integer>> getUnifier()
+      {
+        throw new UnsupportedOperationException("not partitionable");
+      }
+    };
+
+    @Override
+    public void endWindow()
+    {
+      for (Map.Entry<K, MutableInt> e: map.entrySet()) {
+        count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+      }
+      map.clear();
+    }
+
+  }
+
+}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
new file mode 100644
index 0000000..eed344b
--- /dev/null
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
@@ -0,0 +1,47 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is a simple operator that emits random number.
+ */
+public class RandomNumberGenerator extends BaseOperator implements InputOperator
+{
+  private int numTuples = 100;
+  private transient int count = 0;
+
+  public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    count = 0;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (count++ < numTuples) {
+      out.emit(Math.random());
+    }
+  }
+
+  public int getNumTuples()
+  {
+    return numTuples;
+  }
+
+  /**
+   * Sets the number of tuples to be emitted every window.
+   * @param numTuples number of tuples
+   */
+  public void setNumTuples(int numTuples)
+  {
+    this.numTuples = numTuples;
+  }
+}
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..876c39a
--- /dev/null
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+  <!-- memory assigned to app master
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  -->
+  <property>
+    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <value>1000</value>
+  </property>
+  <property>
+    <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
+    <value>hello world: %s</value>
+  </property>
+</configuration>
+
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
new file mode 100644
index 0000000..ccb2b66
--- /dev/null
+++ b/examples/exactly-once/src/site/conf/my-app-conf1.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<configuration>
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+  <property>
+    <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
+    <value>1000</value>
+  </property>
+</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
new file mode 100644
index 0000000..c5aa69c
--- /dev/null
+++ b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
@@ -0,0 +1,124 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.myapexapp;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.example.myapexapp.Application;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Test the application in local mode.
+ */
+public class ApplicationTest
+{
+  private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String KAFKA_TOPIC = "exactly-once-test";
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "WORDS";
+
+  @Before
+  public void beforeTest() throws Exception {
+    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+    kafkaLauncher.startZookeeper();
+    kafkaLauncher.startKafkaServer();
+    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+
+    // setup hsqldb
+    Class.forName(DB_DRIVER).newInstance();
+
+    Connection con = DriverManager.getConnection(DB_URL);
+    Statement stmt = con.createStatement();
+
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + ")";
+    stmt.executeUpdate(createMetaTable);
+
+    String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+        + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+    stmt.executeUpdate(createTable);
+
+  }
+
+  @After
+  public void afterTest() {
+    kafkaLauncher.stopKafkaServer();
+    kafkaLauncher.stopZookeeper();
+  }
+
+  @Test
+  public void testApplication() throws Exception {
+    try {
+      // produce some test data
+      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+      String[] words = "count the words from kafka and store them in the db".split("\\s+");
+      p.setMessages(Lists.newArrayList(words));
+      new Thread(p).start();
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+      conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
+      conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
+
+      lma.prepareDAG(new Application(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync(); // test will terminate after results are available
+
+      HashSet<String> wordsSet = Sets.newHashSet(words);
+      Connection con = DriverManager.getConnection(DB_URL);
+      Statement stmt = con.createStatement();
+      int rowCount = 0;
+      long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+      while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+        Thread.sleep(1000);
+        String countQuery = "SELECT count(*) from " + TABLE_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        rowCount = resultSet.getInt(1);
+        resultSet.close();
+        LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+      }
+      Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+
+      lc.shutdown();
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd5910b
--- /dev/null
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+log4j.logger.kafka.server=info
+log4j.logger.kafka.request.logger=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <co...@apex.apache.org>.