You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:08 UTC

[03/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
new file mode 100644
index 0000000..7bbb8ec
--- /dev/null
+++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+public class SQLApplicationWithModelFileTest
+{
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml"));
+
+    SQLApplicationWithModelFile app = new SQLApplicationWithModelFile();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+
+    PrintStream originalSysout = System.out;
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(baos));
+
+    lc.runAsync();
+    waitTillStdoutIsPopulated(baos, 30000);
+    lc.shutdown();
+
+    System.setOut(originalSysout);
+
+    String[] sout = baos.toString().split(System.lineSeparator());
+    Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+    String[] actualLines = filter.toArray(new String[filter.size()]);
+    Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+    Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+    Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+    Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+    Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+    Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+  }
+
+  public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+    IOException
+  {
+    long now = System.currentTimeMillis();
+    Collection<String> filter = Lists.newArrayList();
+    while (System.currentTimeMillis() - now < timeout) {
+      baos.flush();
+      String[] sout = baos.toString().split(System.lineSeparator());
+      filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+      if (filter.size() != 0) {
+        break;
+      }
+
+      Thread.sleep(500);
+    }
+
+    return (filter.size() != 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/resources/input.csv b/examples/sql/src/test/resources/input.csv
new file mode 100644
index 0000000..c4786d1
--- /dev/null
+++ b/examples/sql/src/test/resources/input.csv
@@ -0,0 +1,6 @@
+15/02/2016 10:15:00 +0000,1,paint1,11
+15/02/2016 10:16:00 +0000,2,paint2,12
+15/02/2016 10:17:00 +0000,3,paint3,13
+15/02/2016 10:18:00 +0000,4,paint4,14
+15/02/2016 10:19:00 +0000,5,paint5,15
+15/02/2016 10:10:00 +0000,6,abcde6,16

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/resources/log4j.properties b/examples/sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8ea3cfe
--- /dev/null
+++ b/examples/sql/src/test/resources/log4j.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.calcite=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.kafka.consumer=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml
new file mode 100644
index 0000000..76924c9
--- /dev/null
+++ b/examples/twitter/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-twitter</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Twitter Example</name>
+  <description>Twitter Rolling Top Words application demonstrates real-time computations over a sliding window.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <!-- required by twitter example -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-core</artifactId>
+      <version>4.0.6</version>
+    </dependency>
+    <dependency>
+      <!-- required by twitter example -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-stream</artifactId>
+      <version>4.0.6</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase</artifactId>
+      <version>0.94.20</version>
+      <type>jar</type>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>5.1.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-kinesis</artifactId>
+      <version>1.9.10</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.4.4</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>6.6.4</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/assemble/appPackage.xml b/examples/twitter/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/twitter/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
new file mode 100644
index 0000000..be7edfb
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.net.URI;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
+import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
+import com.datatorrent.contrib.kinesis.KinesisStringOutputOperator;
+import com.datatorrent.contrib.kinesis.ShardManager;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to Hashtag
+ * extractor and extract the status and send it into kinesis <br>
+ * Get the records from kinesis and converts into Hashtags. Top 10 Hashtag(s) mentioned in
+ * tweets in last 5 mins are displayed on every window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 Hashtag mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>,
+ * <a href="https://http://aws.amazon.com/">AWS Account</a> and configure the authentication details.
+ * For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.FromKinesis.streamName</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.FromKinesis.accessKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.FromKinesis.secretKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.ToKinesis.streamName</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.ToKinesis.accessKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.ToKinesis.secretKey</name>
+ *   <value>TBD</value> </property>
+ *
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see similar output on console as below:
+ *
+ * <pre>
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : org.apache.apex.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusHashtagExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The outputOp operator : </b> This operator sent the tags into the kinesis. <br>
+ * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
+ * </li>
+ * <li><b>The inputOp operator : </b> This operator fetches the records from kinesis and
+ * converts into hastags and emits them. <br>
+ * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * Hashtag extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+@ApplicationAnnotation(name = "TwitterKinesisExample")
+public class KinesisHashtagsApplication implements StreamingApplication
+{
+  private final Locality locality = null;
+
+  private InputPort<Object> consoleOutput(DAG dag, String operatorName)
+  {
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+    if (!StringUtils.isEmpty(gatewayAddress)) {
+      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+      String topic = "examples.twitter." + operatorName;
+      //LOG.info("WebSocket with gateway at: {}", gatewayAddress);
+      PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
+      wsOut.setUri(uri);
+      wsOut.setTopic(topic);
+      return wsOut.input;
+    }
+    ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
+    operator.setStringFormat(operatorName + ": %s");
+    return operator.input;
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Setup the operator to get the data from twitter sample stream injected into the system.
+    TwitterSampleInput twitterFeed = new TwitterSampleInput();
+    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+    //  Setup the operator to get the Hashtags extracted from the twitter statuses
+    TwitterStatusHashtagExtractor HashtagExtractor = dag.addOperator("HashtagExtractor", TwitterStatusHashtagExtractor.class);
+
+    //Setup the operator send the twitter statuses to kinesis
+    KinesisStringOutputOperator outputOp = dag.addOperator("ToKinesis", new KinesisStringOutputOperator());
+    outputOp.setBatchSize(500);
+
+    // Feed the statuses from feed into the input of the Hashtag extractor.
+    dag.addStream("TweetStream", twitterFeed.status, HashtagExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
+    //  Start counting the Hashtags coming out of Hashtag extractor
+    dag.addStream("SendToKinesis", HashtagExtractor.hashtags, outputOp.inputPort).setLocality(locality);
+
+    //------------------------------------------------------------------------------------------
+
+    KinesisStringInputOperator inputOp = dag.addOperator("FromKinesis", new KinesisStringInputOperator());
+    ShardManager shardStats = new ShardManager();
+    inputOp.setShardManager(shardStats);
+    inputOp.getConsumer().setRecordsLimit(600);
+    inputOp.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString());
+
+    // Setup a node to count the unique Hashtags within a window.
+    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
+
+    // Get the aggregated Hashtag counts and count them over last 5 mins.
+    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+    topCounts.setTopCount(10);
+    topCounts.setSlidingWindowWidth(600);
+    topCounts.setDagWindowWidth(1);
+
+    dag.addStream("TwittedHashtags", inputOp.outputPort, uniqueCounter.data).setLocality(locality);
+
+    // Count unique Hashtags
+    dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input).setLocality(locality);
+    // Count top 10
+    dag.addStream("TopHashtags", topCounts.output, consoleOutput(dag, "topHashtags")).setLocality(locality);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
new file mode 100644
index 0000000..dfcac81
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.io.Serializable;
+
+/**
+ * Developed for a example<br>
+ *
+ * @param <T> Type of object for which sliding window is being maintained.
+ * @since 0.3.2
+ */
+public class SlidingContainer<T> implements Serializable
+{
+  private static final long serialVersionUID = 201305291751L;
+  T identifier;
+  int totalCount;
+  int position;
+  int[] windowedCount;
+
+  @SuppressWarnings("unused")
+  private SlidingContainer()
+  {
+    /* needed for Kryo serialization */
+  }
+
+  public SlidingContainer(T identifier, int windowCount)
+  {
+    this.identifier = identifier;
+    this.totalCount = 0;
+    this.position = 0;
+    windowedCount = new int[windowCount];
+  }
+
+  public void adjustCount(int i)
+  {
+    windowedCount[position] += i;
+  }
+
+  public void slide()
+  {
+    int currentCount = windowedCount[position];
+    position = position == windowedCount.length - 1 ? 0 : position + 1;
+    totalCount += currentCount - windowedCount[position];
+    windowedCount[position] = 0;
+  }
+
+  @Override
+  public String toString()
+  {
+    return identifier + " => " + totalCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
new file mode 100644
index 0000000..62ec6cf
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+
+import twitter4j.Status;
+
+/**
+ * An application which connects to Twitter Sample Input and stores all the
+ * tweets with their usernames in a mysql database. Please review the docs
+ * for TwitterTopCounterApplication to setup your twitter credentials. You
+ * may also be able to change JDBCStore credentials using config file.
+ *
+ * You will also need to create appropriate database and tables with the
+ * following schema, also included in mysql.sql in resources:
+ * <pre>
+ * DROP TABLE if exists tweets;
+ * CREATE TABLE tweets (
+ * window_id LONG NOT NULL,
+ * creation_date DATE,
+ * text VARCHAR(256) NOT NULL,
+ * userid VARCHAR(40) NOT NULL,
+ * KEY ( userid, creation_date)
+ * );
+ *
+ * drop table if exists dt_window_id_tracker;
+ * CREATE TABLE dt_window_id_tracker (
+ * dt_application_id VARCHAR(100) NOT NULL,
+ * dt_operator_id int(11) NOT NULL,
+ * dt_window_id bigint NOT NULL,
+ * UNIQUE (dt_application_id, dt_operator_id, dt_window_id)
+ * )  ENGINE=MyISAM DEFAULT CHARSET=latin1;
+ * </pre>
+ *
+ * @since 0.9.4
+ */
+@ApplicationAnnotation(name = "TwitterDumpExample")
+public class TwitterDumpApplication implements StreamingApplication
+{
+  public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status>
+  {
+    public static final String INSERT_STATUS_STATEMENT = "insert into tweets (window_id, creation_date, text, userid) values (?, ?, ?, ?)";
+
+    public Status2Database()
+    {
+      store.setMetaTable("dt_window_id_tracker");
+      store.setMetaTableAppIdColumn("dt_application_id");
+      store.setMetaTableOperatorIdColumn("dt_operator_id");
+      store.setMetaTableWindowColumn("dt_window_id");
+    }
+
+    @Nonnull
+    @Override
+    protected String getUpdateCommand()
+    {
+      return INSERT_STATUS_STATEMENT;
+    }
+
+    @Override
+    protected void setStatementParameters(PreparedStatement statement, Status tuple) throws SQLException
+    {
+      statement.setLong(1, currentWindowId);
+
+      statement.setDate(2, new java.sql.Date(tuple.getCreatedAt().getTime()));
+      statement.setString(3, tuple.getText());
+      statement.setString(4, tuple.getUser().getScreenName());
+      statement.addBatch();
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
+
+    TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
+
+    //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator());
+
+    Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database());
+    dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver");
+    dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter");
+    dbWriter.getStore().setConnectionProperties("user:twitter");
+
+    dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
new file mode 100644
index 0000000..3169132
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
+import twitter4j.Status;
+
+/**
+ * An application which connects to Twitter Sample Input and stores all the
+ * tweets with their usernames in a hbase database. Please review the docs
+ * for TwitterTopCounterApplication to setup your twitter credentials.
+ *
+ * You need to create the HBase table to run this example. Table name can be
+ * configured but columnfamily must be 'cf' to make this example simple and complied
+ * with the mysql based example.
+ * create 'tablename', 'cf'
+ *
+ * </pre>
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = "TwitterDumpHBaseExample")
+public class TwitterDumpHBaseApplication implements StreamingApplication
+{
+
+  public static class Status2Hbase extends AbstractHBasePutOutputOperator<Status>
+  {
+
+    @Override
+    public Put operationPut(Status t)
+    {
+      Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array());
+      put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes());
+      put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes());
+      return put;
+    }
+
+  }
+
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
+
+    TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
+
+    Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase());
+
+    dag.addStream("Statuses", twitterStream.status, hBaseWriter.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
new file mode 100644
index 0000000..7b468ec
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.HashtagEntity;
+import twitter4j.Status;
+
+/**
+ * <p>TwitterStatusHashtagExtractor class.</p>
+ *
+ * @since 1.0.2
+ */
+public class TwitterStatusHashtagExtractor extends BaseOperator
+{
+  public final transient DefaultOutputPort<String> hashtags = new DefaultOutputPort<String>();
+  public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
+  {
+    @Override
+    public void process(Status status)
+    {
+      HashtagEntity[] entities = status.getHashtagEntities();
+      if (entities != null) {
+        for (HashtagEntity he : entities) {
+          if (he != null) {
+            hashtags.emit(he.getText());
+          }
+        }
+      }
+    }
+
+  };
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
new file mode 100644
index 0000000..40a7d3d
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.Status;
+import twitter4j.URLEntity;
+
+/**
+ * <p>TwitterStatusURLExtractor class.</p>
+ *
+ * @since 0.3.2
+ */
+public class TwitterStatusURLExtractor extends BaseOperator
+{
+  public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>();
+  public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
+  {
+    @Override
+    public void process(Status status)
+    {
+      URLEntity[] entities = status.getURLEntities();
+      if (entities != null) {
+        for (URLEntity ue: entities) {
+          if (ue != null) { // see why we intermittently get NPEs
+            url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString());
+          }
+        }
+      }
+    }
+  };
+
+  private static final Logger LOG = LoggerFactory.getLogger(TwitterStatusURLExtractor.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
new file mode 100644
index 0000000..6581b76
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>TwitterStatusWordExtractor class.</p>
+ *
+ * @since 0.3.2
+ */
+public class TwitterStatusWordExtractor extends BaseOperator
+{
+  public HashSet<String> filterList;
+
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String text)
+    {
+      String[] strs = text.split(" ");
+      if (strs != null) {
+        for (String str : strs) {
+          if (str != null && !filterList.contains(str) ) {
+            output.emit(str);
+          }
+        }
+      }
+    }
+  };
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but",
+      "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
+      "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
new file mode 100644
index 0000000..ee43383
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to url
+ * extractor. <br>
+ * Top 10 url(s) mentioned in tweets in last 5 mins are displayed on every
+ * window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 url mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
+ * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ *   <property> <name>dt.operator.TweetSampler.consumerKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.consumerSecret</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.accessToken</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.accessTokenSecret</name>
+ *   <value>TBD</value> </property>
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique url to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console:
+ *
+ * <pre>
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The urlExtractor operator : </b> This operator extracts url from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusURLExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * url extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top url in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME)
+public class TwitterTopCounterApplication implements StreamingApplication
+{
+  public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json";
+  public static final String CONVERSION_SCHEMA = "twitterURLConverterSchema.json";
+  public static final String APP_NAME = "TwitterExample";
+
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Setup the operator to get the data from twitter sample stream injected into the system.
+    TwitterSampleInput twitterFeed = new TwitterSampleInput();
+    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+    //  Setup the operator to get the URLs extracted from the twitter statuses
+    TwitterStatusURLExtractor urlExtractor = dag.addOperator("URLExtractor", TwitterStatusURLExtractor.class);
+
+    // Setup a node to count the unique urls within a window.
+    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueURLCounter", new UniqueCounter<String>());
+    // Get the aggregated url counts and count them over last 5 mins.
+    dag.setAttribute(uniqueCounter, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 600);
+    dag.setAttribute(uniqueCounter, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 1);
+
+
+    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+    topCounts.setTopCount(10);
+    topCounts.setSlidingWindowWidth(1);
+    topCounts.setDagWindowWidth(1);
+
+    // Feed the statuses from feed into the input of the url extractor.
+    dag.addStream("TweetStream", twitterFeed.status, urlExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
+    //  Start counting the urls coming out of URL extractor
+    dag.addStream("TwittedURLs", urlExtractor.url, uniqueCounter.data).setLocality(locality);
+    // Count unique urls
+    dag.addStream("UniqueURLCounts", uniqueCounter.count, topCounts.input);
+
+    consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url");
+  }
+
+  public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
+  {
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+    if (!StringUtils.isEmpty(gatewayAddress)) {
+      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+      AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
+
+      Map<String, String> conversionMap = Maps.newHashMap();
+      conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
+      String snapshotServerJSON = SchemaUtils.jarResourceFileToString(schemaFile);
+
+      snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+      snapshotServer.setTableFieldToMapField(conversionMap);
+
+      PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+      wsQuery.setUri(uri);
+      snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
+      PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+      wsResult.setUri(uri);
+      Operator.InputPort<String> queryResultPort = wsResult.input;
+
+      dag.addStream("MapProvider", topCount, snapshotServer.input);
+      dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+    } else {
+      ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
+      operator.setStringFormat(operatorName + ": %s");
+
+      dag.addStream("MapProvider", topCount, operator.input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
new file mode 100644
index 0000000..fb274ea
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+
+/**
+ * This application is same as other twitter example
+ * {@link TwitterTopCounterApplication} <br>
+ * Run Sample :
+ *
+ * <pre>
+ * 2013-06-17 16:50:34,911 [Twitter Stream consumer-1[Establishing connection]] INFO  twitter4j.TwitterStreamImpl info - Connection established.
+ * 2013-06-17 16:50:34,912 [Twitter Stream consumer-1[Establishing connection]] INFO  twitter4j.TwitterStreamImpl info - Receiving status stream.
+ * topWords: {}
+ * topWords: {love=1, ate=1, catch=1, calma=1, Phillies=1, ela=1, from=1, running=1}
+ * </pre>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME)
+public class TwitterTopWordsApplication implements StreamingApplication
+{
+  public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json";
+  public static final String CONVERSION_SCHEMA = "twitterWordConverterSchema.json";
+  public static final String APP_NAME = "RollingTopWordsExample";
+  public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TwitterSampleInput twitterFeed = new TwitterSampleInput();
+    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+    TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class);
+    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>());
+    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+
+    topCounts.setSlidingWindowWidth(120);
+    topCounts.setDagWindowWidth(1);
+
+    dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input);
+    dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data);
+    dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL);
+
+    TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
new file mode 100644
index 0000000..7a03a64
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to Hashtag
+ * extractor. <br>
+ * Top 10 Hashtag(s) mentioned in tweets in last 5 mins are displayed on every
+ * window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 Hashtag mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
+ * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
+ *   <value>TBD</value> </property>
+ *
+ *   <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
+ *   <value>TBD</value> </property>
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see similar output on console as below:
+ *
+ * <pre>
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * 2013-06-17 14:38:55,201 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO  stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusHashtagExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * Hashtag extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME)
+public class TwitterTrendingHashtagsApplication implements StreamingApplication
+{
+  public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json";
+  public static final String CONVERSION_SCHEMA = "twitterHashTagConverterSchema.json";
+  public static final String APP_NAME = "TwitterTrendingExample";
+  public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
+
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Setup the operator to get the data from twitter sample stream injected into the system.
+    TwitterSampleInput twitterFeed = new TwitterSampleInput();
+    twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+    // Setup a node to count the unique Hashtags within a window.
+    UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
+
+    // Get the aggregated Hashtag counts and count them over last 5 mins.
+    WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+    topCounts.setTopCount(10);
+    topCounts.setSlidingWindowWidth(600);
+    topCounts.setDagWindowWidth(1);
+
+    dag.addStream("TwittedHashtags", twitterFeed.hashtag, uniqueCounter.data).setLocality(locality);
+    // Count unique Hashtags
+    dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input);
+
+    TwitterTopCounterApplication.consoleOutput(dag, "topHashtags", topCounts.output, SNAPSHOT_SCHEMA, "hashtag");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
new file mode 100644
index 0000000..0c3f481
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>URLSerDe class.</p>
+ *
+ * @since 0.3.2
+ */
+public class URLSerDe implements StreamCodec<byte[]>
+{
+  /**
+   * Covert the bytes into object useful for downstream node.
+   *
+   * @param fragment
+   * @return WindowedURLHolder object which represents the bytes.
+   */
+  @Override
+  public byte[] fromByteArray(Slice fragment)
+  {
+    if (fragment == null || fragment.buffer == null) {
+      return null;
+    } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
+      return fragment.buffer;
+    } else {
+      byte[] buffer = new byte[fragment.buffer.length];
+      System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length);
+      return buffer;
+    }
+  }
+
+  /**
+   * Cast the input object to byte[].
+   *
+   * @param object - byte array representing the bytes of the string
+   * @return the same object as input
+   */
+  @Override
+  public Slice toByteArray(byte[] object)
+  {
+    return new Slice(object, 0, object.length);
+  }
+
+  @Override
+  public int getPartition(byte[] object)
+  {
+    ByteBuffer bb = ByteBuffer.wrap(object);
+    return bb.hashCode();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(URLSerDe.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
new file mode 100644
index 0000000..8ecd6ae
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ *
+ * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
+ * The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of
+ * occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples
+ * of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer>
+ * with a pre determined size. The emitted object contains the most frequently occurring keys.
+ *
+ * @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly.
+ * @since 0.3.2
+ */
+public class WindowedTopCounter<T> extends BaseOperator
+{
+  public static final String FIELD_TYPE = "type";
+  public static final String FIELD_COUNT = "count";
+
+  private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class);
+
+  private PriorityQueue<SlidingContainer<T>> topCounter;
+  private int windows;
+  private int topCount = 10;
+  private int slidingWindowWidth;
+  private int dagWindowWidth;
+  private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>();
+
+  /**
+   * Input port on which map objects containing keys with their respective frequency as values will be accepted.
+   */
+  public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>()
+  {
+    @Override
+    public void process(Map<T, Integer> map)
+    {
+      for (Map.Entry<T, Integer> e : map.entrySet()) {
+        SlidingContainer<T> holder = objects.get(e.getKey());
+        if (holder == null) {
+          holder = new SlidingContainer<T>(e.getKey(), windows);
+          objects.put(e.getKey(), holder);
+        }
+        holder.adjustCount(e.getValue());
+      }
+    }
+  };
+
+  public final transient DefaultOutputPort<List<Map<String, Object>>> output = new DefaultOutputPort<List<Map<String, Object>>>();
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    windows = (int)(slidingWindowWidth / dagWindowWidth) + 1;
+    if (slidingWindowWidth % dagWindowWidth != 0) {
+      logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")");
+    }
+
+    topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator());
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    topCounter.clear();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator();
+    int i = topCount;
+
+    /*
+     * Try to fill the priority queue with the first topCount URLs.
+     */
+    SlidingContainer<T> holder;
+    while (iterator.hasNext()) {
+      holder = iterator.next().getValue();
+      holder.slide();
+
+      if (holder.totalCount == 0) {
+        iterator.remove();
+      } else {
+        topCounter.add(holder);
+        if (--i == 0) {
+          break;
+        }
+      }
+    }
+    logger.debug("objects.size(): {}", objects.size());
+
+    /*
+     * Make room for the new element in the priority queue by deleting the
+     * smallest one, if we KNOW that the new element is useful to us.
+     */
+    if (i == 0) {
+      int smallest = topCounter.peek().totalCount;
+      while (iterator.hasNext()) {
+        holder = iterator.next().getValue();
+        holder.slide();
+
+        if (holder.totalCount > smallest) {
+          topCounter.poll();
+          topCounter.add(holder);
+          smallest = topCounter.peek().totalCount;
+        } else if (holder.totalCount == 0) {
+          iterator.remove();
+        }
+      }
+    }
+
+    List<Map<String, Object>> data = Lists.newArrayList();
+
+    Iterator<SlidingContainer<T>> topIter = topCounter.iterator();
+
+    while (topIter.hasNext()) {
+      final SlidingContainer<T> wh = topIter.next();
+      Map<String, Object> tableRow = Maps.newHashMap();
+
+      tableRow.put(FIELD_TYPE, wh.identifier.toString());
+      tableRow.put(FIELD_COUNT, wh.totalCount);
+
+      data.add(tableRow);
+    }
+
+    Collections.sort(data, TwitterOutputSorter.INSTANCE);
+
+    output.emit(data);
+    topCounter.clear();
+  }
+
+  @Override
+  public void teardown()
+  {
+    topCounter = null;
+    objects = null;
+  }
+
+  /**
+   * Set the count of most frequently occurring keys to emit per map object.
+   *
+   * @param count count of the objects in the map emitted at the output port.
+   */
+  public void setTopCount(int count)
+  {
+    topCount = count;
+  }
+
+  public int getTopCount()
+  {
+    return topCount;
+  }
+
+  /**
+   * @return the windows
+   */
+  public int getWindows()
+  {
+    return windows;
+  }
+
+  /**
+   * @param windows the windows to set
+   */
+  public void setWindows(int windows)
+  {
+    this.windows = windows;
+  }
+
+  /**
+   * @return the slidingWindowWidth
+   */
+  public int getSlidingWindowWidth()
+  {
+    return slidingWindowWidth;
+  }
+
+  /**
+   * Set the width of the sliding window.
+   *
+   * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
+   * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
+   * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
+   *
+   * @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window.
+   */
+  public void setSlidingWindowWidth(int slidingWindowWidth)
+  {
+    this.slidingWindowWidth = slidingWindowWidth;
+  }
+
+  /**
+   * @return the dagWindowWidth
+   */
+  public int getDagWindowWidth()
+  {
+    return dagWindowWidth;
+  }
+
+  /**
+   * Set the width of the sliding window.
+   *
+   * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
+   * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
+   * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
+   *
+   * @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level.
+   */
+  public void setDagWindowWidth(int dagWindowWidth)
+  {
+    this.dagWindowWidth = dagWindowWidth;
+  }
+
+  static class TopSpotComparator implements Comparator<SlidingContainer<?>>
+  {
+    @Override
+    public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2)
+    {
+      if (o1.totalCount > o2.totalCount) {
+        return 1;
+      } else if (o1.totalCount < o2.totalCount) {
+        return -1;
+      }
+
+      return 0;
+    }
+  }
+
+  private static class TwitterOutputSorter implements Comparator<Map<String, Object>>
+  {
+    public static final TwitterOutputSorter INSTANCE = new TwitterOutputSorter();
+
+    private TwitterOutputSorter()
+    {
+    }
+
+    @Override
+    public int compare(Map<String, Object> o1, Map<String, Object> o2)
+    {
+      Integer count1 = (Integer)o1.get(FIELD_COUNT);
+      Integer count2 = (Integer)o2.get(FIELD_COUNT);
+
+      return count1.compareTo(count2);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif
new file mode 100644
index 0000000..d21e1d9
Binary files /dev/null and b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
new file mode 100644
index 0000000..956ff08
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Twitter top URL's demonstration application.
+ */
+package org.apache.apex.examples.twitter;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
new file mode 100644
index 0000000..47b6531
--- /dev/null
+++ b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
@@ -0,0 +1,52 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+<!-- properties for the twitter kinesis example -->
+<configuration>
+
+    <property>
+        <name>dt.operator.FromKinesis.streamName</name>
+        <value>TwitterTag</value>
+    </property>
+    <property>
+        <name>dt.operator.FromKinesis.accessKey</name>
+    </property>
+    <property>
+        <name>dt.operator.FromKinesis.secretKey</name>
+    </property>
+    <property>
+        <name>dt.operator.FromKinesis.endPoint</name>
+    </property>
+    <property>
+        <name>dt.operator.ToKinesis.streamName</name>
+        <value>TwitterTag</value>
+    </property>
+    <property>
+        <name>dt.operator.ToKinesis.accessKey</name>
+    </property>
+    <property>
+        <name>dt.operator.ToKinesis.secretKey</name>
+    </property>
+    <property>
+        <name>dt.operator.ToKinesis.endPoint</name>
+    </property>
+
+</configuration>