You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:08 UTC
[03/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
new file mode 100644
index 0000000..7bbb8ec
--- /dev/null
+++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql.sample;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+
+public class SQLApplicationWithModelFileTest
+{
+ private TimeZone defaultTZ;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ defaultTZ = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ TimeZone.setDefault(defaultTZ);
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml"));
+
+ SQLApplicationWithModelFile app = new SQLApplicationWithModelFile();
+
+ lma.prepareDAG(app, conf);
+
+ LocalMode.Controller lc = lma.getController();
+
+ PrintStream originalSysout = System.out;
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+
+ lc.runAsync();
+ waitTillStdoutIsPopulated(baos, 30000);
+ lc.shutdown();
+
+ System.setOut(originalSysout);
+
+ String[] sout = baos.toString().split(System.lineSeparator());
+ Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+
+ String[] actualLines = filter.toArray(new String[filter.size()]);
+ Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1"));
+ Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2"));
+ Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3"));
+ Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4"));
+ Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5"));
+ Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6"));
+ }
+
+ public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException,
+ IOException
+ {
+ long now = System.currentTimeMillis();
+ Collection<String> filter = Lists.newArrayList();
+ while (System.currentTimeMillis() - now < timeout) {
+ baos.flush();
+ String[] sout = baos.toString().split(System.lineSeparator());
+ filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:"));
+ if (filter.size() != 0) {
+ break;
+ }
+
+ Thread.sleep(500);
+ }
+
+ return (filter.size() != 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/resources/input.csv b/examples/sql/src/test/resources/input.csv
new file mode 100644
index 0000000..c4786d1
--- /dev/null
+++ b/examples/sql/src/test/resources/input.csv
@@ -0,0 +1,6 @@
+15/02/2016 10:15:00 +0000,1,paint1,11
+15/02/2016 10:16:00 +0000,2,paint2,12
+15/02/2016 10:17:00 +0000,3,paint3,13
+15/02/2016 10:18:00 +0000,4,paint4,14
+15/02/2016 10:19:00 +0000,5,paint5,15
+15/02/2016 10:10:00 +0000,6,abcde6,16
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/sql/src/test/resources/log4j.properties b/examples/sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8ea3cfe
--- /dev/null
+++ b/examples/sql/src/test/resources/log4j.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.calcite=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.kafka.consumer=WARN
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml
new file mode 100644
index 0000000..76924c9
--- /dev/null
+++ b/examples/twitter/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-twitter</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Twitter Example</name>
+ <description>Twitter Rolling Top Words application demonstrates real-time computations over a sliding window.</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.7.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <skipTests>true</skipTests>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <!-- required by twitter example -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>4.0.6</version>
+ </dependency>
+ <dependency>
+ <!-- required by twitter example -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>4.0.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.20</version>
+ <type>jar</type>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.22</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-kinesis</artifactId>
+ <version>1.9.10</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.4.4</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>6.6.4</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/assemble/appPackage.xml b/examples/twitter/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/twitter/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
new file mode 100644
index 0000000..be7edfb
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.net.URI;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
+import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
+import com.datatorrent.contrib.kinesis.KinesisStringOutputOperator;
+import com.datatorrent.contrib.kinesis.ShardManager;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to Hashtag
+ * extractor and extract the status and send it into kinesis <br>
+ * Get the records from kinesis and converts into Hashtags. Top 10 Hashtag(s) mentioned in
+ * tweets in last 5 mins are displayed on every window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 Hashtag mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>,
+ * <a href="https://http://aws.amazon.com/">AWS Account</a> and configure the authentication details.
+ * For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.FromKinesis.streamName</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.FromKinesis.accessKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.FromKinesis.secretKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.ToKinesis.streamName</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.ToKinesis.accessKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.ToKinesis.secretKey</name>
+ * <value>TBD</value> </property>
+ *
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see similar output on console as below:
+ *
+ * <pre>
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : org.apache.apex.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusHashtagExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The outputOp operator : </b> This operator sent the tags into the kinesis. <br>
+ * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
+ * </li>
+ * <li><b>The inputOp operator : </b> This operator fetches the records from kinesis and
+ * converts into hastags and emits them. <br>
+ * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * Hashtag extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+@ApplicationAnnotation(name = "TwitterKinesisExample")
+public class KinesisHashtagsApplication implements StreamingApplication
+{
+ private final Locality locality = null;
+
+ private InputPort<Object> consoleOutput(DAG dag, String operatorName)
+ {
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+ if (!StringUtils.isEmpty(gatewayAddress)) {
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ String topic = "examples.twitter." + operatorName;
+ //LOG.info("WebSocket with gateway at: {}", gatewayAddress);
+ PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
+ wsOut.setUri(uri);
+ wsOut.setTopic(topic);
+ return wsOut.input;
+ }
+ ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
+ operator.setStringFormat(operatorName + ": %s");
+ return operator.input;
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Setup the operator to get the data from twitter sample stream injected into the system.
+ TwitterSampleInput twitterFeed = new TwitterSampleInput();
+ twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+ // Setup the operator to get the Hashtags extracted from the twitter statuses
+ TwitterStatusHashtagExtractor HashtagExtractor = dag.addOperator("HashtagExtractor", TwitterStatusHashtagExtractor.class);
+
+ //Setup the operator send the twitter statuses to kinesis
+ KinesisStringOutputOperator outputOp = dag.addOperator("ToKinesis", new KinesisStringOutputOperator());
+ outputOp.setBatchSize(500);
+
+ // Feed the statuses from feed into the input of the Hashtag extractor.
+ dag.addStream("TweetStream", twitterFeed.status, HashtagExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
+ // Start counting the Hashtags coming out of Hashtag extractor
+ dag.addStream("SendToKinesis", HashtagExtractor.hashtags, outputOp.inputPort).setLocality(locality);
+
+ //------------------------------------------------------------------------------------------
+
+ KinesisStringInputOperator inputOp = dag.addOperator("FromKinesis", new KinesisStringInputOperator());
+ ShardManager shardStats = new ShardManager();
+ inputOp.setShardManager(shardStats);
+ inputOp.getConsumer().setRecordsLimit(600);
+ inputOp.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString());
+
+ // Setup a node to count the unique Hashtags within a window.
+ UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
+
+ // Get the aggregated Hashtag counts and count them over last 5 mins.
+ WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+ topCounts.setTopCount(10);
+ topCounts.setSlidingWindowWidth(600);
+ topCounts.setDagWindowWidth(1);
+
+ dag.addStream("TwittedHashtags", inputOp.outputPort, uniqueCounter.data).setLocality(locality);
+
+ // Count unique Hashtags
+ dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input).setLocality(locality);
+ // Count top 10
+ dag.addStream("TopHashtags", topCounts.output, consoleOutput(dag, "topHashtags")).setLocality(locality);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
new file mode 100644
index 0000000..dfcac81
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.io.Serializable;
+
+/**
+ * Developed for a example<br>
+ *
+ * @param <T> Type of object for which sliding window is being maintained.
+ * @since 0.3.2
+ */
+public class SlidingContainer<T> implements Serializable
+{
+ private static final long serialVersionUID = 201305291751L;
+ T identifier;
+ int totalCount;
+ int position;
+ int[] windowedCount;
+
+ @SuppressWarnings("unused")
+ private SlidingContainer()
+ {
+ /* needed for Kryo serialization */
+ }
+
+ public SlidingContainer(T identifier, int windowCount)
+ {
+ this.identifier = identifier;
+ this.totalCount = 0;
+ this.position = 0;
+ windowedCount = new int[windowCount];
+ }
+
+ public void adjustCount(int i)
+ {
+ windowedCount[position] += i;
+ }
+
+ public void slide()
+ {
+ int currentCount = windowedCount[position];
+ position = position == windowedCount.length - 1 ? 0 : position + 1;
+ totalCount += currentCount - windowedCount[position];
+ windowedCount[position] = 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return identifier + " => " + totalCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
new file mode 100644
index 0000000..62ec6cf
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+
+import twitter4j.Status;
+
+/**
+ * An application which connects to Twitter Sample Input and stores all the
+ * tweets with their usernames in a mysql database. Please review the docs
+ * for TwitterTopCounterApplication to setup your twitter credentials. You
+ * may also be able to change JDBCStore credentials using config file.
+ *
+ * You will also need to create appropriate database and tables with the
+ * following schema, also included in mysql.sql in resources:
+ * <pre>
+ * DROP TABLE if exists tweets;
+ * CREATE TABLE tweets (
+ * window_id LONG NOT NULL,
+ * creation_date DATE,
+ * text VARCHAR(256) NOT NULL,
+ * userid VARCHAR(40) NOT NULL,
+ * KEY ( userid, creation_date)
+ * );
+ *
+ * drop table if exists dt_window_id_tracker;
+ * CREATE TABLE dt_window_id_tracker (
+ * dt_application_id VARCHAR(100) NOT NULL,
+ * dt_operator_id int(11) NOT NULL,
+ * dt_window_id bigint NOT NULL,
+ * UNIQUE (dt_application_id, dt_operator_id, dt_window_id)
+ * ) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+ * </pre>
+ *
+ * @since 0.9.4
+ */
+@ApplicationAnnotation(name = "TwitterDumpExample")
+public class TwitterDumpApplication implements StreamingApplication
+{
+ public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status>
+ {
+ public static final String INSERT_STATUS_STATEMENT = "insert into tweets (window_id, creation_date, text, userid) values (?, ?, ?, ?)";
+
+ public Status2Database()
+ {
+ store.setMetaTable("dt_window_id_tracker");
+ store.setMetaTableAppIdColumn("dt_application_id");
+ store.setMetaTableOperatorIdColumn("dt_operator_id");
+ store.setMetaTableWindowColumn("dt_window_id");
+ }
+
+ @Nonnull
+ @Override
+ protected String getUpdateCommand()
+ {
+ return INSERT_STATUS_STATEMENT;
+ }
+
+ @Override
+ protected void setStatementParameters(PreparedStatement statement, Status tuple) throws SQLException
+ {
+ statement.setLong(1, currentWindowId);
+
+ statement.setDate(2, new java.sql.Date(tuple.getCreatedAt().getTime()));
+ statement.setString(3, tuple.getText());
+ statement.setString(4, tuple.getUser().getScreenName());
+ statement.addBatch();
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
+
+ TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
+
+ //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator());
+
+ Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database());
+ dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver");
+ dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter");
+ dbWriter.getStore().setConnectionProperties("user:twitter");
+
+ dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
new file mode 100644
index 0000000..3169132
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
+import twitter4j.Status;
+
+/**
+ * An application which connects to Twitter Sample Input and stores all the
+ * tweets with their usernames in a hbase database. Please review the docs
+ * for TwitterTopCounterApplication to setup your twitter credentials.
+ *
+ * You need to create the HBase table to run this example. Table name can be
+ * configured but columnfamily must be 'cf' to make this example simple and complied
+ * with the mysql based example.
+ * create 'tablename', 'cf'
+ *
+ * </pre>
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = "TwitterDumpHBaseExample")
+public class TwitterDumpHBaseApplication implements StreamingApplication
+{
+
+ public static class Status2Hbase extends AbstractHBasePutOutputOperator<Status>
+ {
+
+ @Override
+ public Put operationPut(Status t)
+ {
+ Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array());
+ put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes());
+ put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes());
+ return put;
+ }
+
+ }
+
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
+
+ TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
+
+ Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase());
+
+ dag.addStream("Statuses", twitterStream.status, hBaseWriter.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
new file mode 100644
index 0000000..7b468ec
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.HashtagEntity;
+import twitter4j.Status;
+
+/**
+ * <p>TwitterStatusHashtagExtractor class.</p>
+ *
+ * @since 1.0.2
+ */
+public class TwitterStatusHashtagExtractor extends BaseOperator
+{
+ public final transient DefaultOutputPort<String> hashtags = new DefaultOutputPort<String>();
+ public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
+ {
+ @Override
+ public void process(Status status)
+ {
+ HashtagEntity[] entities = status.getHashtagEntities();
+ if (entities != null) {
+ for (HashtagEntity he : entities) {
+ if (he != null) {
+ hashtags.emit(he.getText());
+ }
+ }
+ }
+ }
+
+ };
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
new file mode 100644
index 0000000..40a7d3d
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+import twitter4j.Status;
+import twitter4j.URLEntity;
+
+/**
+ * <p>TwitterStatusURLExtractor class.</p>
+ *
+ * @since 0.3.2
+ */
+public class TwitterStatusURLExtractor extends BaseOperator
+{
+ public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>();
+ public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>()
+ {
+ @Override
+ public void process(Status status)
+ {
+ URLEntity[] entities = status.getURLEntities();
+ if (entities != null) {
+ for (URLEntity ue: entities) {
+ if (ue != null) { // see why we intermittently get NPEs
+ url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString());
+ }
+ }
+ }
+ }
+ };
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterStatusURLExtractor.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
new file mode 100644
index 0000000..6581b76
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * <p>TwitterStatusWordExtractor class.</p>
+ *
+ * @since 0.3.2
+ */
+public class TwitterStatusWordExtractor extends BaseOperator
+{
+ public HashSet<String> filterList;
+
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String text)
+ {
+ String[] strs = text.split(" ");
+ if (strs != null) {
+ for (String str : strs) {
+ if (str != null && !filterList.contains(str) ) {
+ output.emit(str);
+ }
+ }
+ }
+ }
+ };
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but",
+ "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when",
+ "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"}));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
new file mode 100644
index 0000000..ee43383
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to url
+ * extractor. <br>
+ * Top 10 url(s) mentioned in tweets in last 5 mins are displayed on every
+ * window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 url mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
+ * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ * <property> <name>dt.operator.TweetSampler.consumerKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.consumerSecret</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.accessToken</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.accessTokenSecret</name>
+ * <value>TBD</value> </property>
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique url to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on
+ * console:
+ *
+ * <pre>
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4}
+ * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The urlExtractor operator : </b> This operator extracts url from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusURLExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * url extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top url in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME)
+public class TwitterTopCounterApplication implements StreamingApplication
+{
+ public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json";
+ public static final String CONVERSION_SCHEMA = "twitterURLConverterSchema.json";
+ public static final String APP_NAME = "TwitterExample";
+
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Setup the operator to get the data from twitter sample stream injected into the system.
+ TwitterSampleInput twitterFeed = new TwitterSampleInput();
+ twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+ // Setup the operator to get the URLs extracted from the twitter statuses
+ TwitterStatusURLExtractor urlExtractor = dag.addOperator("URLExtractor", TwitterStatusURLExtractor.class);
+
+ // Setup a node to count the unique urls within a window.
+ UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueURLCounter", new UniqueCounter<String>());
+ // Get the aggregated url counts and count them over last 5 mins.
+ dag.setAttribute(uniqueCounter, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 600);
+ dag.setAttribute(uniqueCounter, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 1);
+
+
+ WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+ topCounts.setTopCount(10);
+ topCounts.setSlidingWindowWidth(1);
+ topCounts.setDagWindowWidth(1);
+
+ // Feed the statuses from feed into the input of the url extractor.
+ dag.addStream("TweetStream", twitterFeed.status, urlExtractor.input).setLocality(Locality.CONTAINER_LOCAL);
+ // Start counting the urls coming out of URL extractor
+ dag.addStream("TwittedURLs", urlExtractor.url, uniqueCounter.data).setLocality(locality);
+ // Count unique urls
+ dag.addStream("UniqueURLCounts", uniqueCounter.count, topCounts.input);
+
+ consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url");
+ }
+
+ public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
+ {
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+ if (!StringUtils.isEmpty(gatewayAddress)) {
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+ AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
+
+ Map<String, String> conversionMap = Maps.newHashMap();
+ conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
+ String snapshotServerJSON = SchemaUtils.jarResourceFileToString(schemaFile);
+
+ snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+ snapshotServer.setTableFieldToMapField(conversionMap);
+
+ PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
+ wsQuery.setUri(uri);
+ snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
+ PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+ wsResult.setUri(uri);
+ Operator.InputPort<String> queryResultPort = wsResult.input;
+
+ dag.addStream("MapProvider", topCount, snapshotServer.input);
+ dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+ } else {
+ ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator());
+ operator.setStringFormat(operatorName + ": %s");
+
+ dag.addStream("MapProvider", topCount, operator.input);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
new file mode 100644
index 0000000..fb274ea
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+
+/**
+ * This application is same as other twitter example
+ * {@link TwitterTopCounterApplication} <br>
+ * Run Sample :
+ *
+ * <pre>
+ * 2013-06-17 16:50:34,911 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Connection established.
+ * 2013-06-17 16:50:34,912 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Receiving status stream.
+ * topWords: {}
+ * topWords: {love=1, ate=1, catch=1, calma=1, Phillies=1, ela=1, from=1, running=1}
+ * </pre>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME)
+public class TwitterTopWordsApplication implements StreamingApplication
+{
+ public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json";
+ public static final String CONVERSION_SCHEMA = "twitterWordConverterSchema.json";
+ public static final String APP_NAME = "RollingTopWordsExample";
+ public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TwitterSampleInput twitterFeed = new TwitterSampleInput();
+ twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+ TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class);
+ UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>());
+ WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+
+ topCounts.setSlidingWindowWidth(120);
+ topCounts.setDagWindowWidth(1);
+
+ dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input);
+ dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data);
+ dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL);
+
+ TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
new file mode 100644
index 0000000..7a03a64
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.algo.UniqueCounter;
+
+/**
+ * Twitter Example Application: <br>
+ * This example application samples random public status from twitter, send to Hashtag
+ * extractor. <br>
+ * Top 10 Hashtag(s) mentioned in tweets in last 5 mins are displayed on every
+ * window count (500ms).<br>
+ * <br>
+ *
+ * Real Time Calculation :<br>
+ * This application calculates top 10 Hashtag mentioned in tweets in last 5
+ * minutes across a 1% random tweet sampling on a rolling window basis.<br>
+ * <br>
+ * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>
+ * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml:
+ * <pre>
+ * {@code
+ * <?xml version="1.0" encoding="UTF-8"?>
+ * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+ * <configuration>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.accessToken</name>
+ * <value>TBD</value> </property>
+ *
+ * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name>
+ * <value>TBD</value> </property>
+ * </configuration>
+ * }
+ * </pre>
+ * Custom Attributes: <br>
+ * <b>topCounts operator : <b>
+ * <ul>
+ * <li>Top Count : 10, number of top unique Hashtag to be reported.</li>
+ * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li>
+ * <li>window slide value : 1</li>
+ * </ul>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see similar output on console as below:
+ *
+ * <pre>
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"\u0645\u0639_\u0627\u0644\u0644\u0647\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"}
+ * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished.
+ * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request
+ * </pre>
+ *
+ * Scaling Options : <br>
+ * User can scale application by setting intial partition size > 1 on count
+ * unique operator. <br>
+ * <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 500ms(default) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The twitterFeed operator : </b> This operator samples random public
+ * statues from twitter and emits to application. <br>
+ * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from
+ * random sampled statues from twitter. <br>
+ * Class : {@link TwitterStatusHashtagExtractor} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each
+ * Hashtag extracted from random samples. <br>
+ * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br>
+ * StateFull : No, window count 1 <br>
+ * </li>
+ * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1
+ * min sliding window count 1. <br>
+ * Class : com.datatorrent.lib.algo.WindowedTopCounter <br>
+ * StateFull : Yes, sliding window count 120 (1 min) <br>
+ * </li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). <br>
+ * </li>
+ * </ul>
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME)
+public class TwitterTrendingHashtagsApplication implements StreamingApplication
+{
+ public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json";
+ public static final String CONVERSION_SCHEMA = "twitterHashTagConverterSchema.json";
+ public static final String APP_NAME = "TwitterTrendingExample";
+ public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData";
+
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Setup the operator to get the data from twitter sample stream injected into the system.
+ TwitterSampleInput twitterFeed = new TwitterSampleInput();
+ twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
+
+ // Setup a node to count the unique Hashtags within a window.
+ UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>());
+
+ // Get the aggregated Hashtag counts and count them over last 5 mins.
+ WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
+ topCounts.setTopCount(10);
+ topCounts.setSlidingWindowWidth(600);
+ topCounts.setDagWindowWidth(1);
+
+ dag.addStream("TwittedHashtags", twitterFeed.hashtag, uniqueCounter.data).setLocality(locality);
+ // Count unique Hashtags
+ dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input);
+
+ TwitterTopCounterApplication.consoleOutput(dag, "topHashtags", topCounts.output, SNAPSHOT_SCHEMA, "hashtag");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
new file mode 100644
index 0000000..0c3f481
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * <p>URLSerDe class.</p>
+ *
+ * @since 0.3.2
+ */
+public class URLSerDe implements StreamCodec<byte[]>
+{
+ /**
+ * Covert the bytes into object useful for downstream node.
+ *
+ * @param fragment
+ * @return WindowedURLHolder object which represents the bytes.
+ */
+ @Override
+ public byte[] fromByteArray(Slice fragment)
+ {
+ if (fragment == null || fragment.buffer == null) {
+ return null;
+ } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) {
+ return fragment.buffer;
+ } else {
+ byte[] buffer = new byte[fragment.buffer.length];
+ System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length);
+ return buffer;
+ }
+ }
+
+ /**
+ * Cast the input object to byte[].
+ *
+ * @param object - byte array representing the bytes of the string
+ * @return the same object as input
+ */
+ @Override
+ public Slice toByteArray(byte[] object)
+ {
+ return new Slice(object, 0, object.length);
+ }
+
+ @Override
+ public int getPartition(byte[] object)
+ {
+ ByteBuffer bb = ByteBuffer.wrap(object);
+ return bb.hashCode();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(URLSerDe.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
new file mode 100644
index 0000000..8ecd6ae
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ *
+ * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
+ * The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of
+ * occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples
+ * of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer>
+ * with a pre determined size. The emitted object contains the most frequently occurring keys.
+ *
+ * @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly.
+ * @since 0.3.2
+ */
+public class WindowedTopCounter<T> extends BaseOperator
+{
+ public static final String FIELD_TYPE = "type";
+ public static final String FIELD_COUNT = "count";
+
+ private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class);
+
+ private PriorityQueue<SlidingContainer<T>> topCounter;
+ private int windows;
+ private int topCount = 10;
+ private int slidingWindowWidth;
+ private int dagWindowWidth;
+ private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>();
+
+ /**
+ * Input port on which map objects containing keys with their respective frequency as values will be accepted.
+ */
+ public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>()
+ {
+ @Override
+ public void process(Map<T, Integer> map)
+ {
+ for (Map.Entry<T, Integer> e : map.entrySet()) {
+ SlidingContainer<T> holder = objects.get(e.getKey());
+ if (holder == null) {
+ holder = new SlidingContainer<T>(e.getKey(), windows);
+ objects.put(e.getKey(), holder);
+ }
+ holder.adjustCount(e.getValue());
+ }
+ }
+ };
+
+ public final transient DefaultOutputPort<List<Map<String, Object>>> output = new DefaultOutputPort<List<Map<String, Object>>>();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ windows = (int)(slidingWindowWidth / dagWindowWidth) + 1;
+ if (slidingWindowWidth % dagWindowWidth != 0) {
+ logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")");
+ }
+
+ topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator());
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ topCounter.clear();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator();
+ int i = topCount;
+
+ /*
+ * Try to fill the priority queue with the first topCount URLs.
+ */
+ SlidingContainer<T> holder;
+ while (iterator.hasNext()) {
+ holder = iterator.next().getValue();
+ holder.slide();
+
+ if (holder.totalCount == 0) {
+ iterator.remove();
+ } else {
+ topCounter.add(holder);
+ if (--i == 0) {
+ break;
+ }
+ }
+ }
+ logger.debug("objects.size(): {}", objects.size());
+
+ /*
+ * Make room for the new element in the priority queue by deleting the
+ * smallest one, if we KNOW that the new element is useful to us.
+ */
+ if (i == 0) {
+ int smallest = topCounter.peek().totalCount;
+ while (iterator.hasNext()) {
+ holder = iterator.next().getValue();
+ holder.slide();
+
+ if (holder.totalCount > smallest) {
+ topCounter.poll();
+ topCounter.add(holder);
+ smallest = topCounter.peek().totalCount;
+ } else if (holder.totalCount == 0) {
+ iterator.remove();
+ }
+ }
+ }
+
+ List<Map<String, Object>> data = Lists.newArrayList();
+
+ Iterator<SlidingContainer<T>> topIter = topCounter.iterator();
+
+ while (topIter.hasNext()) {
+ final SlidingContainer<T> wh = topIter.next();
+ Map<String, Object> tableRow = Maps.newHashMap();
+
+ tableRow.put(FIELD_TYPE, wh.identifier.toString());
+ tableRow.put(FIELD_COUNT, wh.totalCount);
+
+ data.add(tableRow);
+ }
+
+ Collections.sort(data, TwitterOutputSorter.INSTANCE);
+
+ output.emit(data);
+ topCounter.clear();
+ }
+
+ @Override
+ public void teardown()
+ {
+ topCounter = null;
+ objects = null;
+ }
+
+ /**
+ * Set the count of most frequently occurring keys to emit per map object.
+ *
+ * @param count count of the objects in the map emitted at the output port.
+ */
+ public void setTopCount(int count)
+ {
+ topCount = count;
+ }
+
+ public int getTopCount()
+ {
+ return topCount;
+ }
+
+ /**
+ * @return the windows
+ */
+ public int getWindows()
+ {
+ return windows;
+ }
+
+ /**
+ * @param windows the windows to set
+ */
+ public void setWindows(int windows)
+ {
+ this.windows = windows;
+ }
+
+ /**
+ * @return the slidingWindowWidth
+ */
+ public int getSlidingWindowWidth()
+ {
+ return slidingWindowWidth;
+ }
+
+ /**
+ * Set the width of the sliding window.
+ *
+ * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
+ * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
+ * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
+ *
+ * @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window.
+ */
+ public void setSlidingWindowWidth(int slidingWindowWidth)
+ {
+ this.slidingWindowWidth = slidingWindowWidth;
+ }
+
+ /**
+ * @return the dagWindowWidth
+ */
+ public int getDagWindowWidth()
+ {
+ return dagWindowWidth;
+ }
+
+ /**
+ * Set the width of the sliding window.
+ *
+ * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
+ * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
+ * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
+ *
+ * @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level.
+ */
+ public void setDagWindowWidth(int dagWindowWidth)
+ {
+ this.dagWindowWidth = dagWindowWidth;
+ }
+
+ static class TopSpotComparator implements Comparator<SlidingContainer<?>>
+ {
+ @Override
+ public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2)
+ {
+ if (o1.totalCount > o2.totalCount) {
+ return 1;
+ } else if (o1.totalCount < o2.totalCount) {
+ return -1;
+ }
+
+ return 0;
+ }
+ }
+
+ private static class TwitterOutputSorter implements Comparator<Map<String, Object>>
+ {
+ public static final TwitterOutputSorter INSTANCE = new TwitterOutputSorter();
+
+ private TwitterOutputSorter()
+ {
+ }
+
+ @Override
+ public int compare(Map<String, Object> o1, Map<String, Object> o2)
+ {
+ Integer count1 = (Integer)o1.get(FIELD_COUNT);
+ Integer count2 = (Integer)o2.get(FIELD_COUNT);
+
+ return count1.compareTo(count2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif
new file mode 100644
index 0000000..d21e1d9
Binary files /dev/null and b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
new file mode 100644
index 0000000..956ff08
--- /dev/null
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Twitter top URL's demonstration application.
+ */
+package org.apache.apex.examples.twitter;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
new file mode 100644
index 0000000..47b6531
--- /dev/null
+++ b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml
@@ -0,0 +1,52 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!-- properties for the twitter kinesis example -->
+<configuration>
+
+ <property>
+ <name>dt.operator.FromKinesis.streamName</name>
+ <value>TwitterTag</value>
+ </property>
+ <property>
+ <name>dt.operator.FromKinesis.accessKey</name>
+ </property>
+ <property>
+ <name>dt.operator.FromKinesis.secretKey</name>
+ </property>
+ <property>
+ <name>dt.operator.FromKinesis.endPoint</name>
+ </property>
+ <property>
+ <name>dt.operator.ToKinesis.streamName</name>
+ <value>TwitterTag</value>
+ </property>
+ <property>
+ <name>dt.operator.ToKinesis.accessKey</name>
+ </property>
+ <property>
+ <name>dt.operator.ToKinesis.secretKey</name>
+ </property>
+ <property>
+ <name>dt.operator.ToKinesis.endPoint</name>
+ </property>
+
+</configuration>