You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:20 UTC

[15/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/resources/META-INF/properties.xml b/demos/yahoofinance/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 9186b98..0000000
--- a/demos/yahoofinance/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<!--Configuration for chart demo  -->
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-  <property>
-    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
-    <value>1000</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.AverageChart.xAxisLabel</name>
-    <value>TIME</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.StockTickInput.tickers</name>
-    <value>IBM,GOOG,AAPL,YHOO</value>
-  </property>
-  <property>
-    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
-    <value>256</value>
-  </property>
-  <property>
-    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
-    <value>-Xmx128M</value>
-  </property>
-  <property>
-    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
-    <value>256</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemo.operator.StockTickInput.tickers</name>
-    <value>IBM,GOOG,AAPL,YHOO</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceWithoutChartDemo.operator.StockTickInput.tickers</name>
-    <value>IBM,GOOG,AAPL,YHOO</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.AverageChart.yAxisLabel</name>
-    <value>PRICE</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.CandleStickChart.xAxisLabel</name>
-    <value>TIME</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.CandleStickChart.yAxisLabel</name>
-    <value>PRICE</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.AverageChart.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.YahooFinanceDemoWithChart.operator.AverageChart.attr.APPLICATION_WINDOW_COUNT
-    </name>
-    <value>5</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
deleted file mode 100644
index c038e61..0000000
--- a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- * Run Yahoo Finance application demo.
- *
- */
-public class ApplicationTest
-{
-
-  /**
-   * This will run for ever.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testApplication() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    new YahooFinanceApplication().populateDAG(lma.getDAG(), new Configuration(false));
-    LocalMode.Controller lc = lma.getController();
-    lc.run(10000);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
deleted file mode 100644
index 7b134f5..0000000
--- a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- *
- */
-public class ApplicationWithDerbySQLTest
-{
-  private final transient Logger LOG = LoggerFactory.getLogger(ApplicationWithDerbySQLTest.class);
-  public ApplicationWithDerbySQLTest()
-  {
-  }
-
-  @Test
-  public void testSomeMethod() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    new ApplicationWithDerbySQL().populateDAG(lma.getDAG(), new Configuration(false));
-    LocalMode.Controller lc = lma.getController();
-
-    long start = System.currentTimeMillis();
-    lc.run();
-    long end = System.currentTimeMillis();
-    long time = end - start;
-    LOG.debug("Test used " + time + " ms");
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/test/resources/alert_create.json
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/test/resources/alert_create.json b/demos/yahoofinance/src/test/resources/alert_create.json
deleted file mode 100644
index 3059145..0000000
--- a/demos/yahoofinance/src/test/resources/alert_create.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{
-    "name":"alertName",
-    "streamName":"yahooFinance.outputPort",
-    "filter": { 
-        "class": "com.datatorrent.lib.util.JavaScriptFilterOperator", 
-        "properties": {
-            "setupScript":"function f() { return s0 == \"AAPL\" && l1 > 508 }",
-            "functionName":"f"
-        }
-    },
-    "escalation": { 
-        "class": "com.datatorrent.lib.util.AlertEscalationOperator", 
-        "properties": { 
-            "alertInterval":"5000",
-            "timeout":"10000"
-        }
-    },
-    "actions": [{ 
-        "outputPort":"alert",
-        "inputPort":"input",
-        "class":"com.datatorrent.lib.io.ConsoleOutputOperator"
-    }],
-    "saveAs":"firstAlert"
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/test/resources/log4j.properties b/demos/yahoofinance/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/yahoofinance/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/docs/CodingConventionsAndStyle.md
----------------------------------------------------------------------
diff --git a/docs/CodingConventionsAndStyle.md b/docs/CodingConventionsAndStyle.md
index 154f1de..baf2943 100644
--- a/docs/CodingConventionsAndStyle.md
+++ b/docs/CodingConventionsAndStyle.md
@@ -1,7 +1,7 @@
 Coding Conventions and Style
 ==============================
 
-Malhar GitHub repository contains operator library and demos built on top of the Apex platform. The code is open source, viewable and downloadable by all. Anyone can make code submissions to the repository to add new features or fix bugs. The process to do so is to first make a personal fork of the repository, make changes in the fork and then generate a pull request with the changes against the Malhar repository.
+Malhar GitHub repository contains operator library and examples built on top of the Apex platform. The code is open source, viewable and downloadable by all. Anyone can make code submissions to the repository to add new features or fix bugs. The process to do so is to first make a personal fork of the repository, make changes in the fork and then generate a pull request with the changes against the Malhar repository.
 
 Malhar administrators look at pull requests regularly and merge them into the repository. The pull requests have to follow certain guidelines in order to minimize the possibility of issues and problems arising from the merge, to keep the code maintainable going forward and to keep the licensing. The guidelines are as follows
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 7b6a441..2d6bc46 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -1,7 +1,7 @@
 Apache Apex Malhar
 ================================================================================
 
-Apache Apex Malhar is an open source operator and codec library that can be used with the [Apache Apex](http://apex.apache.org/) platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.
+Apache Apex Malhar is an open source operator and codec library that can be used with the [Apache Apex](http://apex.apache.org/) platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of example applications, demonstrating operator features and capabilities.
 
 ![MalharDiagram](images/malhar-operators.png)
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/docs/operators/windowedOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/windowedOperator.md b/docs/operators/windowedOperator.md
index c0ac142..22012bf 100644
--- a/docs/operators/windowedOperator.md
+++ b/docs/operators/windowedOperator.md
@@ -247,7 +247,7 @@ The `WindowedMergeOperator` has its own watermark. Its watermark timestamp is th
 
 ## Usage Examples
 
-For an example usage of the `WindowedOperator` via the High level API, click [here](https://github.com/apache/apex-malhar/blob/master/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java).
+For an example usage of the `WindowedOperator` via the High level API, click [here](https://github.com/apache/apex-malhar/blob/master/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java).
 
 For an example usage of the `WindowedOperator` via the DAG level API, click [here](https://github.com/apache/apex-malhar/blob/master/library/src/test/java/org/apache/apex/malhar/lib/window/sample/wordcount/Application.java).
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/pom.xml
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/pom.xml b/examples/distributedistinct/pom.xml
new file mode 100644
index 0000000..510ea68
--- /dev/null
+++ b/examples/distributedistinct/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <artifactId>malhar-examples-distributedistinct</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Distributed Distinct Example</name>
+  <description></description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/assemble/appPackage.xml b/examples/distributedistinct/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/distributedistinct/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/Application.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/Application.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/Application.java
new file mode 100644
index 0000000..8eb8f2d
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/Application.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.stream.Counter;
+import com.datatorrent.lib.stream.StreamDuplicater;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This application demonstrates the UniqueValueCount operator. It uses an input operator which generates random key
+ * value pairs and simultaneously emits them to the UniqueValueCount operator and keeps track of the number of unique
+ * values per key to emit to the verifier.
+ *
+ * @since 1.0.4
+ */
+@ApplicationAnnotation(name = "ValueCount")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomKeyValGenerator randGen = dag.addOperator("RandomGenerator", new RandomKeyValGenerator());
+    UniqueValueCount<Integer> valCount = dag.addOperator("UniqueCounter", new UniqueValueCount<Integer>());
+    ConsoleOutputOperator consOut = dag.addOperator("Console", new ConsoleOutputOperator());
+    StreamDuplicater<KeyValPair<Integer, Integer>> dup = dag.addOperator("Duplicator", new StreamDuplicater<KeyValPair<Integer, Integer>>());
+    CountVerifier verifier = dag.addOperator("Verifier", new CountVerifier());
+    ConsoleOutputOperator successOutput = dag.addOperator("Success", new ConsoleOutputOperator());
+    successOutput.setStringFormat("Success %d");
+    ConsoleOutputOperator failureOutput = dag.addOperator("Failure", new ConsoleOutputOperator());
+    failureOutput.setStringFormat("Failure %d");
+
+    Counter successcounter = dag.addOperator("SuccessCounter", new Counter());
+    Counter failurecounter = dag.addOperator("FailureCounter", new Counter());
+
+    dag.addStream("Events", randGen.outport, valCount.input);
+    dag.addStream("Duplicates", valCount.output, dup.data);
+    dag.addStream("Unverified", dup.out1, verifier.recIn);
+    dag.addStream("EventCount", randGen.verport, verifier.trueIn);
+    dag.addStream("Verified", verifier.successPort, successcounter.input);
+    dag.addStream("Failed", verifier.failurePort, failurecounter.input);
+    dag.addStream("SuccessCount", successcounter.output, successOutput.input);
+    dag.addStream("FailedCount", failurecounter.output, failureOutput.input);
+    dag.addStream("Output", dup.out2, consOut.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/CountVerifier.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/CountVerifier.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/CountVerifier.java
new file mode 100644
index 0000000..380ed4e
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/CountVerifier.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * An operator that takes two streams of Integer to Integer KeyValPairs and verifies that the two streams output the
+ * same KeyValPairs within a given window.
+ *
+ * @since 1.0.4
+ */
+public class CountVerifier implements Operator
+{
+  Map<Integer, Integer> trueCount = new HashMap<Integer, Integer>();
+  Map<Integer, Integer> receivedCount = new HashMap<Integer, Integer>();
+
+  public final transient DefaultInputPort<KeyValPair<Integer, Integer>> trueIn = new DefaultInputPort<KeyValPair<Integer, Integer>>()
+  {
+    @Override
+    public void process(KeyValPair<Integer, Integer> tuple)
+    {
+      trueCount.put(tuple.getKey(), tuple.getValue());
+    }
+  };
+
+  public final transient DefaultInputPort<KeyValPair<Integer, Integer>> recIn = new DefaultInputPort<KeyValPair<Integer, Integer>>()
+  {
+    @Override
+    public void process(KeyValPair<Integer, Integer> tuple)
+    {
+      receivedCount.put(tuple.getKey(), tuple.getValue());
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowID)
+  {
+  }
+
+  /**
+   * Checks that the key to value pairs are the same and counts the number of pairs that are different. If there are
+   * failures, it will emit the number of failures to the failure port. Otherwise, it will emit the number of keys to
+   * the success port.
+   */
+  @Override
+  public void endWindow()
+  {
+    int failureCount = 0;
+    for (Map.Entry<Integer, Integer> e : receivedCount.entrySet()) {
+      Integer key = e.getKey();
+      if (!trueCount.get(key).equals(e.getValue())) {
+        failureCount++;
+      }
+    }
+    if (failureCount != 0) {
+      failurePort.emit(failureCount);
+    } else {
+      successPort.emit(trueCount.size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/IntegerUniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/IntegerUniqueValueCountAppender.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/IntegerUniqueValueCountAppender.java
new file mode 100644
index 0000000..229eb32
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/IntegerUniqueValueCountAppender.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This operator demonstrates {@link UniqueValueCountAppender} given that the keys and values of the preceding {@link UniqueValueCount} operator
+ * are both integers. <br/>
+ * It will keep track of the number of all the unique values emitted per key since the application starts.
+ *
+ * @since 1.0.4
+ */
+public class IntegerUniqueValueCountAppender extends UniqueValueCountAppender<Integer>
+{
+  @Override
+  public Object processResultSet(ResultSet resultSet)
+  {
+    Set<Integer> valSet = new HashSet<Integer>();
+    try {
+      while (resultSet.next()) {
+        valSet.add(resultSet.getInt(1));
+      }
+      return valSet;
+    } catch (SQLException e) {
+      throw new RuntimeException("while processing the result set", e);
+    }
+  }
+
+  @Override
+  protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException
+  {
+    getStatement.setInt(1, (Integer)key);
+  }
+
+  @Override
+  protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException
+  {
+    @SuppressWarnings("unchecked")
+    Set<Integer> valueSet = (Set<Integer>)value;
+    for (Integer val : valueSet) {
+      @SuppressWarnings("unchecked")
+      Set<Integer> currentVals = (Set<Integer>)get(key);
+      if (!currentVals.contains(val)) {
+        batch = true;
+        putStatement.setInt(1, (Integer)key);
+        putStatement.setInt(2, val);
+        putStatement.setLong(3, windowID);
+        putStatement.addBatch();
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    try {
+      Statement stmt = store.getConnection().createStatement();
+      String keySetQuery = "SELECT DISTINCT col1 FROM " + tableName;
+      ResultSet resultSet = stmt.executeQuery(keySetQuery);
+      while (resultSet.next()) {
+        int val = resultSet.getInt(1);
+        @SuppressWarnings("unchecked")
+        Set<Integer> valSet = (Set<Integer>)cacheManager.get(val);
+        output.emit(new KeyValPair<Object, Object>(val, valSet.size()));
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("While emitting tuples", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/RandomKeyValGenerator.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/RandomKeyValGenerator.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/RandomKeyValGenerator.java
new file mode 100644
index 0000000..b8e04be
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/RandomKeyValGenerator.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.KeyHashValPair;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generates random KeyValPairs and optionally, keeps track of the number of unique values per pair to emit to the
+ * verification port.
+ *
+ * @since 1.0.4
+ */
+public class RandomKeyValGenerator implements InputOperator
+{
+
+  protected int numKeys = 50;
+  protected int numVals = 1000;
+  protected int tupleBlast = 1000;
+  protected Map<Integer, Set<Integer>> valhistory = new HashMap<Integer, Set<Integer>>();
+  private Random rand = new Random();
+  private boolean once;
+  private boolean clearHistory;
+  @OutputPortFieldAnnotation(optional = false)
+  public transient DefaultOutputPort<KeyValPair<Integer, Object>> outport = new DefaultOutputPort<KeyValPair<Integer, Object>>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public transient DefaultOutputPort<KeyValPair<Integer, Integer>> verport = new DefaultOutputPort<KeyValPair<Integer, Integer>>();
+
+  /**
+   * Ensures that the generator emits KeyValPairs once per window
+   */
+  @Override
+  public void beginWindow(long l)
+  {
+    once = false;
+  }
+
+  /**
+   * Emits the total count of unique values per key as KeyHashValPairs to the verification port
+   */
+  @Override
+  public void endWindow()
+  {
+    if (verport.isConnected()) {
+      for (Map.Entry<Integer, Set<Integer>> e : valhistory.entrySet()) {
+        verport.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), e.getValue().size()));
+      }
+    }
+    if (clearHistory) {
+      valhistory.clear();
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  /**
+   * Emits random KeyValPairs and keeps track of the unique values per key.
+   */
+  @Override
+  public void emitTuples()
+  {
+    if (!once) {
+      int key;
+      int val;
+      for (int i = 0; i < tupleBlast; i++) {
+        key = rand.nextInt(numKeys);
+        val = rand.nextInt(numVals);
+        outport.emit(new KeyValPair<Integer, Object>(key, val));
+        if (verport.isConnected()) {
+          Set<Integer> count = valhistory.get(key);
+          if (count == null) {
+            Set<Integer> tempset = new HashSet<Integer>();
+            tempset.add(val);
+            valhistory.put(key, tempset);
+            LOG.debug("key {} val {}", key, tempset);
+          } else if (!valhistory.get(key).contains(val)) {
+            valhistory.get(key).add(val);
+          }
+        }
+      }
+      once = true;
+    }
+  }
+
+  /**
+   * @return the number of possible keys
+   */
+  public int getNumKeys()
+  {
+    return numKeys;
+  }
+
+  /**
+   * Sets the number of possible keys to numKeys
+   *
+   * @param numKeys
+   *          the new number of possible keys
+   */
+  public void setNumKeys(int numKeys)
+  {
+    this.numKeys = numKeys;
+  }
+
+  /**
+   * Returns the number of possible values that can be emitted
+   *
+   * @return the number of possible values that can be emitted
+   */
+  public int getNumVals()
+  {
+    return numVals;
+  }
+
+  /**
+   * Sets the number of possible values that can be emitted to numVals
+   *
+   * @param numVals
+   *          the number of possible values that can be emitted
+   */
+  public void setNumVals(int numVals)
+  {
+    this.numVals = numVals;
+  }
+
+  /**
+   * Sets the number of KeyValPairs to be emitted to tupleBlast
+   *
+   * @param tupleBlast
+   *          the new number of KeyValPairs to be emitted
+   */
+  public void setTupleBlast(int tupleBlast)
+  {
+    this.tupleBlast = tupleBlast;
+  }
+
+  /**
+   * tuple blast
+   * @return
+   */
+  public int getTupleBlast()
+  {
+    return tupleBlast;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(RandomKeyValGenerator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/StatefulApplication.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/StatefulApplication.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/StatefulApplication.java
new file mode 100644
index 0000000..550000e
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/StatefulApplication.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * This application demonstrates the UniqueValueCount operator. It uses an input operator which generates random key
+ * value pairs and emits them to the UniqueValueCount operator which keeps track of the unique values per window. It
+ * then emits the values to the StatefulUniqueCount which uses a combination of a cache and database to keep track of
+ * the overall unique values and outputs the resulting unique value count to the ConsoleOutputOperator.
+ *
+ * @since 1.0.4
+ */
+@ApplicationAnnotation(name = "StatefulDistinctCount")
+public class StatefulApplication implements StreamingApplication
+{
+  @SuppressWarnings("unchecked")
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomKeyValGenerator randGen = dag.addOperator("RandomGenerator", new RandomKeyValGenerator());
+    UniqueValueCount<Integer> valCount = dag.addOperator("UniqueCounter", new UniqueValueCount<Integer>());
+    ConsoleOutputOperator consOut = dag.addOperator("Console", new ConsoleOutputOperator());
+    IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("StatefulUniqueCounter", new IntegerUniqueValueCountAppender());
+    dag.getOperatorMeta("StatefulUniqueCounter").getMeta(uniqueUnifier.input).getAttributes().put(Context.PortContext.STREAM_CODEC, new KeyBasedStreamCodec());
+
+    @SuppressWarnings("rawtypes")
+    DefaultOutputPort valOut = valCount.output;
+    @SuppressWarnings("rawtypes")
+    DefaultOutputPort uniqueOut = uniqueUnifier.output;
+
+    dag.addStream("Events", randGen.outport, valCount.input);
+    dag.addStream("Unified", valOut, uniqueUnifier.input);
+    dag.addStream("Result", uniqueOut, consOut.input);
+  }
+
+  public static class KeyBasedStreamCodec extends KryoSerializableStreamCodec<InternalCountOutput<Integer>> implements Serializable
+  {
+    @Override
+    public int getPartition(InternalCountOutput<Integer> t)
+    {
+      return t.getKey().hashCode();
+    }
+
+    private static final long serialVersionUID = 201407231527L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/UniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/UniqueValueCountAppender.java b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/UniqueValueCountAppender.java
new file mode 100644
index 0000000..c81f7b8
--- /dev/null
+++ b/examples/distributedistinct/src/main/java/org/apache/apex/examples/distributeddistinct/UniqueValueCountAppender.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
+import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * <p>
+ * This operator supplements the {@link UniqueValueCount} operator by making it state-full.<br/>
+ * It helps to track unique values through out the lifetime of the application.
+ * </p>
+ *
+ * <p>
+ * The operator counts the number of values emitted per key by storing previously
+ * counted values in both a transient cache and in a persistent database. <br/>
+ * In case of a rollback, it will erase all values committed to the database
+ * in the windows greater than the activation window, then re-enter them as needed to keep it stateful.<br/>
+ * This operator, when appended to {@link UniqueValueCount} will keep track of the
+ * unique values emitted since the start of the application.
+ *
+ * @since 1.0.4
+ */
+public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedOperator<InternalCountOutput<V>> implements Partitioner<UniqueValueCountAppender<V>>
+{
+  protected Set<Integer> partitionKeys;
+  protected int partitionMask;
+  protected transient long windowID;
+  protected transient boolean batch;
+  @Min(1)
+  private int partitionCount = 1;
+
+  public UniqueValueCountAppender()
+
+  {
+    partitionKeys = Sets.newHashSet(0);
+    partitionMask = 0;
+  }
+
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    LOGGER.debug("store properties {} {}", store.getDatabaseDriver(), store.getDatabaseUrl());
+    LOGGER.debug("table name {}", tableName);
+    windowID = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
+    try {
+      ResultSet resultSet = store.getConnection().createStatement().executeQuery("SELECT col1 FROM " + tableName + " WHERE col3 >= " + windowID);
+      PreparedStatement deleteStatement = store.getConnection().prepareStatement("DELETE FROM " + tableName + " WHERE col3 >= " + windowID + " AND col1 = ?");
+
+      Set<Object> deletedKeys = Sets.newHashSet();
+      while (resultSet.next()) {
+        Object key = resultSet.getObject(1);
+        if (partitionKeys.contains((key.hashCode() & partitionMask)) && !deletedKeys.contains(key)) {
+          deletedKeys.add(key);
+          deleteStatement.setObject(1, key);
+          deleteStatement.executeUpdate();
+        }
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void processTuple(InternalCountOutput<V> tuple)
+  {
+
+    Object key = getKeyFromTuple(tuple);
+    @SuppressWarnings("unchecked")
+    Set<Object> values = (Set<Object>)cacheManager.get(key);
+    if (values == null) {
+      values = Sets.newHashSet();
+    }
+    values.addAll(tuple.getInternalSet());
+    cacheManager.put(key, values);
+  }
+
+  @Override
+  protected String fetchInsertQuery()
+  {
+    return "INSERT INTO " + tableName + " (col1, col2, col3) VALUES (?, ?, ?)";
+  }
+
+  @Override
+  protected String fetchGetQuery()
+  {
+    return "select col2 from " + tableName + " where col1 = ?";
+  }
+
+  @Override
+  public Map<Object, Object> loadInitialData()
+  {
+    return null;
+  }
+
+  @Override
+  public void put(@Nonnull Object key, @Nonnull Object value)
+  {
+    try {
+      batch = false;
+      preparePutStatement(putStatement, key, value);
+      if (batch) {
+        putStatement.executeBatch();
+        putStatement.clearBatch();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException("while executing insert", e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowID)
+  {
+    this.windowID = windowID;
+  }
+
+  @Override
+  protected Object getKeyFromTuple(InternalCountOutput<V> tuple)
+  {
+    return tuple.getKey();
+  }
+
+  @Override
+  public void putAll(Map<Object, Object> m)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void remove(Object key)
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  /**
+   * Assigns the partitions according to certain key values and keeps track of the
+   * keys that each partition will be processing so that in the case of a
+   * rollback, each partition will only clear the data that it is responsible for.
+   */
+  @Override
+  public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, PartitioningContext context)
+  {
+    final int finalCapacity = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount);
+    UniqueValueCountAppender<V> anOldOperator = partitions.iterator().next().getPartitionedInstance();
+    partitions.clear();
+
+    Collection<Partition<UniqueValueCountAppender<V>>> newPartitions = Lists.newArrayListWithCapacity(finalCapacity);
+
+    for (int i = 0; i < finalCapacity; i++) {
+      try {
+        @SuppressWarnings("unchecked")
+        UniqueValueCountAppender<V> statefulUniqueCount = this.getClass().newInstance();
+        DefaultPartition<UniqueValueCountAppender<V>> partition = new DefaultPartition<UniqueValueCountAppender<V>>(statefulUniqueCount);
+        newPartitions.add(partition);
+      } catch (Throwable cause) {
+        DTThrowable.rethrow(cause);
+      }
+    }
+
+    DefaultPartition.assignPartitionKeys(Collections.unmodifiableCollection(newPartitions), input);
+    int lPartitionMask = newPartitions.iterator().next().getPartitionKeys().get(input).mask;
+
+    for (Partition<UniqueValueCountAppender<V>> statefulUniqueCountPartition : newPartitions) {
+      UniqueValueCountAppender<V> statefulUniqueCountInstance = statefulUniqueCountPartition.getPartitionedInstance();
+
+      statefulUniqueCountInstance.partitionKeys = statefulUniqueCountPartition.getPartitionKeys().get(input).partitions;
+      statefulUniqueCountInstance.partitionMask = lPartitionMask;
+      statefulUniqueCountInstance.store = anOldOperator.store;
+      statefulUniqueCountInstance.tableName = anOldOperator.tableName;
+      statefulUniqueCountInstance.cacheManager = anOldOperator.cacheManager;
+    }
+    return newPartitions;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions)
+  {
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UniqueValueCountAppender.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/main/resources/META-INF/properties.xml b/examples/distributedistinct/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..8742328
--- /dev/null
+++ b/examples/distributedistinct/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/ApplicationTest.java b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/ApplicationTest.java
new file mode 100644
index 0000000..4861051
--- /dev/null
+++ b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/ApplicationTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+public class ApplicationTest
+{
+  @Test
+  public void testSomeMethod() throws Exception
+  {
+    LocalMode.runApp(new Application(), 15000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/DistributedDistinctTest.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/DistributedDistinctTest.java b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/DistributedDistinctTest.java
new file mode 100644
index 0000000..3ba39d6
--- /dev/null
+++ b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/DistributedDistinctTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.OperatorContextTestHelper.TestIdOperatorContext;
+
+/**
+ * Test for {@link IntegerUniqueValueCountAppender} and {@link UniqueValueCountAppender}
+ */
+public class DistributedDistinctTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(DistributedDistinctTest.class);
+
+  private static final String APP_ID = "DistributedDistinctTest";
+  private static final int OPERATOR_ID = 0;
+
+  public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
+  public static final String TABLE_NAME = "Test_Lookup_Cache";
+
+  private static IntegerUniqueValueCountAppender valueCounter;
+  private static String applicationPath;
+
+  @Test
+  public void testProcess() throws Exception
+  {
+    insertValues();
+    Statement stmt = valueCounter.getStore().getConnection().createStatement();
+
+    ResultSet resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 1");
+    ArrayList<Integer> answersOne = new ArrayList<Integer>();
+    for (int i = 1; i < 16; i++) {
+      answersOne.add(i);
+    }
+    Assert.assertEquals(answersOne, processResult(resultSet));
+
+    resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 2");
+    ArrayList<Integer> answersTwo = new ArrayList<Integer>();
+    answersTwo.add(3);
+    answersTwo.add(6);
+    answersTwo.add(9);
+    for (int i = 11; i < 21; i++) {
+      answersTwo.add(i);
+    }
+    Assert.assertEquals(answersTwo, processResult(resultSet));
+
+    resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 3");
+    ArrayList<Integer> answersThree = new ArrayList<Integer>();
+    answersThree.add(2);
+    answersThree.add(4);
+    answersThree.add(6);
+    answersThree.add(8);
+    answersThree.add(10);
+    for (int i = 11; i < 21; i++) {
+      answersThree.add(i);
+    }
+    Assert.assertEquals(answersThree, processResult(resultSet));
+
+    valueCounter.teardown();
+  }
+
+  public static void insertValues()
+  {
+    logger.debug("start round 0");
+    valueCounter.beginWindow(0);
+    emitKeyVals(1, 1, 10, 1);
+    emitKeyVals(1, 5, 15, 1);
+    valueCounter.endWindow();
+    logger.debug("end round 0");
+
+    logger.debug("start round 1");
+    valueCounter.beginWindow(1);
+    emitKeyVals(2, 3, 15, 3);
+    emitKeyVals(3, 2, 20, 2);
+    emitKeyVals(3, 11, 20, 1);
+    valueCounter.endWindow();
+    logger.debug("end round 1");
+
+    logger.debug("start round 2");
+    valueCounter.beginWindow(2);
+    emitKeyVals(3, 2, 20, 2);
+    emitKeyVals(2, 11, 20, 1);
+    valueCounter.endWindow();
+    logger.debug("end round 2");
+  }
+
+  public static ArrayList<Integer> processResult(ResultSet resultSet)
+  {
+    ArrayList<Integer> tempList = new ArrayList<Integer>();
+    try {
+      while (resultSet.next()) {
+        tempList.add(resultSet.getInt(1));
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    Collections.sort(tempList);
+    return tempList;
+  }
+
+  public static void emitKeyVals(int key, int start, int end, int increment)
+  {
+    int count = 0;
+    Set<Object> valSet = new HashSet<Object>();
+    for (int i = start; i <= end; i += increment) {
+      count++;
+      valSet.add(i);
+    }
+    valueCounter.processTuple(new InternalCountOutput<Integer>(key, count, valSet));
+  }
+
+  @Test
+  public void testSetup() throws Exception
+  {
+    insertValues();
+    Statement stmt = valueCounter.getStore().getConnection().createStatement();
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_ID, APP_ID);
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, 2L);
+
+    valueCounter.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, attributes));
+
+    ResultSet resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 2");
+    ArrayList<Integer> answersAfterClear = new ArrayList<Integer>();
+    for (int i = 3; i < 16; i += 3) {
+      answersAfterClear.add(i);
+    }
+    Assert.assertEquals(answersAfterClear, processResult(resultSet));
+
+    resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 3");
+    ArrayList<Integer> answersThree = new ArrayList<Integer>();
+    answersThree.add(2);
+    answersThree.add(4);
+    answersThree.add(6);
+    answersThree.add(8);
+    answersThree.add(10);
+    for (int i = 11; i < 21; i++) {
+      answersThree.add(i);
+    }
+    Assert.assertEquals(answersThree, processResult(resultSet));
+    stmt.executeQuery("DELETE FROM " + TABLE_NAME);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception
+  {
+    valueCounter = new IntegerUniqueValueCountAppender();
+    Class.forName(INMEM_DB_DRIVER).newInstance();
+    Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties());
+    Statement stmt = con.createStatement();
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (col1 INTEGER, col2 INTEGER, col3 BIGINT)");
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_ID, APP_ID);
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L);
+    valueCounter.setTableName(TABLE_NAME);
+    valueCounter.getStore().setDatabaseDriver(INMEM_DB_DRIVER);
+    valueCounter.getStore().setDatabaseUrl(INMEM_DB_URL);
+    TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
+    valueCounter.setup(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulApplicationTest.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulApplicationTest.java b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulApplicationTest.java
new file mode 100644
index 0000000..5e50f6c
--- /dev/null
+++ b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulApplicationTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+public class StatefulApplicationTest
+{
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(StatefulUniqueCountTest.INMEM_DB_DRIVER).newInstance();
+      Connection con = DriverManager.getConnection(StatefulUniqueCountTest.INMEM_DB_URL, new Properties());
+      Statement stmt = con.createStatement();
+      stmt.execute("CREATE TABLE IF NOT EXISTS " + StatefulUniqueCountTest.TABLE_NAME + " (col1 INTEGER, col2 INTEGER, col3 BIGINT)");
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.StatefulUniqueCounter.prop.tableName", "Test_Lookup_Cache");
+    conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbUrl", "jdbc:hsqldb:mem:test;sql.syntax_mys=true");
+    conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbDriver", "org.hsqldb.jdbcDriver");
+
+    lma.prepareDAG(new StatefulApplication(), conf);
+    lma.cloneDAG();
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    while (System.currentTimeMillis() - now < 15000) {
+      Thread.sleep(1000);
+    }
+
+    lc.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulUniqueCountTest.java
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulUniqueCountTest.java b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulUniqueCountTest.java
new file mode 100644
index 0000000..6821431
--- /dev/null
+++ b/examples/distributedistinct/src/test/java/org/apache/apex/examples/distributeddistinct/StatefulUniqueCountTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.distributeddistinct;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class StatefulUniqueCountTest
+{
+
+  public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
+  public static final String TABLE_NAME = "Test_Lookup_Cache";
+
+  static class KeyGen implements InputOperator
+  {
+
+    public transient DefaultOutputPort<KeyValPair<Integer, Object>> output = new DefaultOutputPort<KeyValPair<Integer, Object>>();
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    public void emitKeyVals(int key, int start, int end, int increment)
+    {
+      for (int i = start; i <= end; i += increment) {
+        output.emit(new KeyValPair<Integer, Object>(key, i));
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+
+    }
+
+    @Override
+    public void teardown()
+    {
+
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      emitKeyVals(1, 1, 10, 1);
+      emitKeyVals(2, 3, 15, 3);
+      emitKeyVals(3, 2, 20, 2);
+      emitKeyVals(1, 5, 15, 1);
+      emitKeyVals(2, 11, 20, 1);
+      emitKeyVals(3, 11, 20, 1);
+    }
+  }
+
+  static class VerifyTable extends BaseOperator
+  {
+
+    private static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+    private static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
+    protected static final String TABLE_NAME = "Test_Lookup_Cache";
+
+    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+      }
+    };
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+      try {
+        Class.forName(INMEM_DB_DRIVER).newInstance();
+        Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties());
+        Statement stmt = con.createStatement();
+        ResultSet resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 1");
+        ArrayList<Integer> answersOne = new ArrayList<Integer>();
+        for (int i = 1; i < 16; i++) {
+          answersOne.add(i);
+        }
+        Assert.assertEquals(answersOne, processResult(resultSet));
+
+        resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 2");
+        ArrayList<Integer> answersTwo = new ArrayList<Integer>();
+        answersTwo.add(3);
+        answersTwo.add(6);
+        answersTwo.add(9);
+        for (int i = 11; i < 21; i++) {
+          answersTwo.add(i);
+        }
+        Assert.assertEquals(answersTwo, processResult(resultSet));
+
+        resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 3");
+        ArrayList<Integer> answersThree = new ArrayList<Integer>();
+        answersThree.add(2);
+        answersThree.add(4);
+        answersThree.add(6);
+        answersThree.add(8);
+        answersThree.add(10);
+        for (int i = 11; i < 21; i++) {
+          answersThree.add(i);
+        }
+        Assert.assertEquals(answersThree, processResult(resultSet));
+      } catch (Throwable e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+
+    }
+
+    public static ArrayList<Integer> processResult(ResultSet resultSet)
+    {
+      ArrayList<Integer> tempList = new ArrayList<Integer>();
+      try {
+        while (resultSet.next()) {
+          tempList.add(resultSet.getInt(1));
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+      Collections.sort(tempList);
+      return tempList;
+    }
+  }
+
+  public class Application implements StreamingApplication
+  {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      KeyGen keyGen = dag.addOperator("KeyGenerator", new KeyGen());
+      UniqueValueCount<Integer> valCount = dag.addOperator("ValueCounter", new UniqueValueCount<Integer>());
+      IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("Unique", new IntegerUniqueValueCountAppender());
+      VerifyTable verifyTable = dag.addOperator("VerifyTable", new VerifyTable());
+
+      @SuppressWarnings("rawtypes")
+      DefaultOutputPort valOut = valCount.output;
+      @SuppressWarnings("rawtypes")
+      DefaultOutputPort uniqueOut = uniqueUnifier.output;
+      dag.addStream("DataIn", keyGen.output, valCount.input);
+      dag.addStream("UnifyWindows", valOut, uniqueUnifier.input);
+      dag.addStream("ResultsOut", uniqueOut, verifyTable.input);
+    }
+  }
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(INMEM_DB_DRIVER).newInstance();
+      Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties());
+      Statement stmt = con.createStatement();
+      stmt.execute("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (col1 INTEGER, col2 INTEGER, col3 BIGINT)");
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.Unique.prop.tableName", "Test_Lookup_Cache");
+    conf.set("dt.operator.Unique.prop.store.dbUrl", "jdbc:hsqldb:mem:test;sql.syntax_mys=true");
+    conf.set("dt.operator.Unique.prop.store.dbDriver", "org.hsqldb.jdbcDriver");
+
+    lma.prepareDAG(new Application(), conf);
+    lma.cloneDAG();
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    while (System.currentTimeMillis() - now < 15000) {
+      Thread.sleep(1000);
+    }
+    lc.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/distributedistinct/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/distributedistinct/src/test/resources/log4j.properties b/examples/distributedistinct/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/distributedistinct/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/pom.xml
----------------------------------------------------------------------
diff --git a/examples/echoserver/pom.xml b/examples/echoserver/pom.xml
new file mode 100644
index 0000000..ae8a4be
--- /dev/null
+++ b/examples/echoserver/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>malhar-examples</artifactId>
+    <groupId>org.apache.apex</groupId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+  
+  <groupId>org.apache.apex</groupId>
+  <artifactId>malhar-examples-echoserver</artifactId>
+  <packaging>jar</packaging>
+
+  <!-- change these to the appropriate values -->
+  <name>Apache Apex Malhar EchoServer Example</name>
+  <description>An example server that echos data sent by a network client back to it</description>
+</project>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/assemble/appPackage.xml b/examples/echoserver/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/echoserver/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Application.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Application.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Application.java
new file mode 100644
index 0000000..e59f160
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Application.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * @since 2.1.0
+ */
+@ApplicationAnnotation(name = "EchoServer")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    MessageReceiver receiver = dag.addOperator("Message Receiver", MessageReceiver.class);
+    MessageResponder responder = dag.addOperator("Message Responder", MessageResponder.class);
+    // Locality has to be container so that the operators use the same socket
+    dag.addStream("messages", receiver.messageOutput, responder.messageInput).setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Message.java
----------------------------------------------------------------------
diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Message.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Message.java
new file mode 100644
index 0000000..3790ea5
--- /dev/null
+++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/Message.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.echoserver;
+
+import java.net.SocketAddress;
+
+/**
+ * @since 2.1.0
+ */
+public class Message
+{
+  public String message;
+  public SocketAddress socketAddress;
+}