You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:22 UTC
[17/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml b/demos/twitter/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index e3042fa..0000000
--- a/demos/twitter/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<!-- properties for rolling top words demo -->
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1256</value>
- </property>
- <!-- default operator size 256MB -->
- <property>
- <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
- <value>256</value>
- </property>
- <property>
- <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
- <value>-Xmx128M</value>
- </property>
-
- <!-- default buffer memory 256MB -->
- <property>
- <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
- <value>256</value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.consumerKey</name>
- </property>
- <property>
- <name>dt.operator.TweetSampler.consumerSecret</name>
- </property>
- <property>
- <name>dt.operator.TweetSampler.accessToken</name>
- </property>
- <property>
- <name>dt.operator.TweetSampler.accessTokenSecret</name>
- </property>
- <property>
- <name>dt.operator.TweetSampler.feedMultiplierVariance</name>
- <value>5</value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.feedMultiplier</name>
- <value>20</value>
- </property>
-
- <!-- RollingTopWordsDemo -->
-
- <property>
- <name>dt.application.RollingTopWordsDemo.operator.TopCounter.topCount</name>
- <value>10</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.stream.TweetStream.locality</name>
- <value>CONTAINER_LOCAL</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
- <value>TwitterWordsQuery</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.operator.QueryResult.topic</name>
- <value>TwitterWordsQueryResult</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.operator.QueryResult.numRetries</name>
- <value>2147483647</value>
- </property>
-
- <!-- TwitterDemo -->
-
- <property>
- <name>dt.application.TwitterDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
- <value>TwitterURLQuery</value>
- </property>
- <property>
- <name>dt.application.TwitterDemo.operator.QueryResult.topic</name>
- <value>TwitterURLQueryResult</value>
- </property>
- <property>
- <name>dt.application.TwitterDemo.operator.QueryResult.numRetries</name>
- <value>2147483647</value>
- </property>
- <property>
- <name>dt.application.TwitterDemo.operator.UniqueURLCounter.attr.APPLICATION_WINDOW_COUNT</name>
- <value>60</value>
- </property>
-
- <!-- TwitterTrendingDemo -->
-
- <property>
- <name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
- <value>TwitterHashtagQueryDemo</value>
- </property>
- <property>
- <name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name>
- <value>TwitterHashtagQueryResultDemo</value>
- </property>
- <property>
- <name>dt.application.TwitterTrendingDemo.operator.QueryResult.numRetries</name>
- <value>2147483647</value>
- </property>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/mysql.sql
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/mysql.sql b/demos/twitter/src/main/resources/mysql.sql
deleted file mode 100644
index e0b97dd..0000000
--- a/demos/twitter/src/main/resources/mysql.sql
+++ /dev/null
@@ -1,35 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing,
--- software distributed under the License is distributed on an
--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
--- KIND, either express or implied. See the License for the
--- specific language governing permissions and limitations
--- under the License.
---
-
-DROP TABLE if exists tweets;
-CREATE TABLE tweets (
- window_id LONG NOT NULL,
- creation_date DATE,
- text VARCHAR(256) NOT NULL,
- userid VARCHAR(40) NOT NULL,
- KEY ( userid, creation_date)
- );
-
-drop table if exists dt_window_id_tracker;
-CREATE TABLE dt_window_id_tracker (
- dt_application_id VARCHAR(100) NOT NULL,
- dt_operator_id int(11) NOT NULL,
- dt_window_id bigint NOT NULL,
- UNIQUE (dt_application_id, dt_operator_id, dt_window_id)
-) ENGINE=MyISAM DEFAULT CHARSET=latin1;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/top_urls.tplg.properties
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/top_urls.tplg.properties b/demos/twitter/src/main/resources/top_urls.tplg.properties
deleted file mode 100644
index c106d7d..0000000
--- a/demos/twitter/src/main/resources/top_urls.tplg.properties
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-stram.node.twitterfeed.classname=com.datatorrent.example.twitter.TwitterSampleInput
-
-stram.stream.status.source=twitterfeed.output
-stram.stream.status.sinks=urlextractor.input
-
-stram.node.urlextractor.classname=com.datatorrent.example.twitter.TwitterStatusURLExtractor
-
-stram.stream.collapsedurls.source=urlextractor.output
-stram.stream.collapsedurls.sinks=
-
-stram.node.
-stram.stream.partitionedtf.input=twitterfeed.output
-stram.stream.partitionedtf.output=partitioned_counter.input
-stram.stream.partitionedtf.serdeClassname=com.datatorrent.example.twitter.URLSerDe
-
-stram.node.partitioned_counter.classname=com.datatorrent.example.twitter.PartitionedCounter
-stram.node.partitioned_counter.topCount=10
-
-stram.stream.merge_stream.input=partitioned_counter.output
-stram.stream.merge_stream.output=merge_counter.input
-stram.stream.merge_stream.serdeClassname=com.datatorrent.example.twitter.URLHolderSerde
-
-stram.node.merge_counter.classname=com.datatorrent.example.twitter.MergeSorter
-stram.node.merge_counter.topCount=10
-
-stram.stream.merged_stream.input=merge_counter.output
-stram.stream.merged_stream.output=console.input
-
-stram.node.console.classname=com.datatorrent.stream.ConsoleOutputStream
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterHashTagDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/twitterHashTagDataSchema.json b/demos/twitter/src/main/resources/twitterHashTagDataSchema.json
deleted file mode 100644
index 0c9296c..0000000
--- a/demos/twitter/src/main/resources/twitterHashTagDataSchema.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "values": [{"name": "hashtag", "type": "string"},
- {"name": "count", "type": "integer"}]
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterURLDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/twitterURLDataSchema.json b/demos/twitter/src/main/resources/twitterURLDataSchema.json
deleted file mode 100644
index ecf723e..0000000
--- a/demos/twitter/src/main/resources/twitterURLDataSchema.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "values": [{"name": "url", "type": "string"},
- {"name": "count", "type": "integer"}]
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterWordDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/twitterWordDataSchema.json b/demos/twitter/src/main/resources/twitterWordDataSchema.json
deleted file mode 100644
index 5e8e7c0..0000000
--- a/demos/twitter/src/main/resources/twitterWordDataSchema.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "values": [{"name": "word", "type": "string"},
- {"name": "count", "type": "integer"}]
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
deleted file mode 100644
index cd211ff..0000000
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import static org.junit.Assert.assertEquals;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-
-/**
- * Test for the application which taps into the twitter's sample input stream and
- * dumps all the tweets into a database.
- */
-public class TwitterDumpApplicationTest
-{
- @Test
- public void testPopulateDAG() throws Exception
- {
- Configuration configuration = new Configuration(false);
-
- LocalMode lm = LocalMode.newInstance();
- DAG prepareDAG = lm.prepareDAG(new TwitterDumpApplication(), configuration);
- DAG clonedDAG = lm.cloneDAG();
-
- assertEquals("Serialization", prepareDAG, clonedDAG);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
deleted file mode 100644
index 91a4e20..0000000
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class TwitterTopCounterTest
-{
- /**
- * This test requires twitter authentication setup and is skipped by default
- * (see {@link TwitterSampleInput}).
- *
- * @throws Exception
- */
- @Test
- public void testApplication() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- new TwitterTopCounterApplication().populateDAG(lma.getDAG(), new Configuration(false));
- LocalMode.Controller lc = lma.getController();
- lc.run(120000);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
deleted file mode 100644
index 4ac2e8d..0000000
--- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.twitter;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class TwitterTopWordsTest
-{
- /**
- * This test requires twitter authentication setup and is skipped by default
- * (see {@link TwitterSampleInput}).
- *
- * @throws Exception
- */
- @Test
- public void testApplication() throws Exception
- {
- TwitterTopWordsApplication app = new TwitterTopWordsApplication();
- Configuration conf = new Configuration(false);
- conf.addResource("dt-site-rollingtopwords.xml");
- LocalMode lma = LocalMode.newInstance();
- lma.prepareDAG(app, conf);
- LocalMode.Controller lc = lma.getController();
- lc.run(120000);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml b/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml
deleted file mode 100644
index b1d4153..0000000
--- a/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml
+++ /dev/null
@@ -1,73 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
-<property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.class</name>
- <value>com.datatorrent.demos.twitter.TwitterTopWordsApplication</value>
- <description>An alias for the application</description>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.operator.TopCounter.topCount
- </name>
- <value>10</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.stream.TweetStream.locality
- </name>
- <value>CONTAINER_LOCAL</value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.stream.TwittedWords.locality
- </name>
- <value></value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.stream.UniqueWordCounts.locality
- </name>
- <value></value>
- </property>
- <property>
- <name>dt.application.RollingTopWordsDemo.stream.TopWords.locality</name>
- <value></value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.consumerKey</name>
- <value>r1DqM35iCTjbgLHf1R7rDbF5R</value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.consumerSecret</name>
- <value>KBpZiR0glPzvZPm1Sa7sq9MCGQ2H2DrVChNtmYQcU75fwuHjed</value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.accessToken</name>
- <value>2490355837-lXKev9vIGzftDjDu9LlyQiuGAqjYyPELFRyRpQo</value>
- </property>
- <property>
- <name>dt.operator.TweetSampler.accessTokenSecret</name>
- <value>lmAxZlFhcBqeTxmyatFT43fzshzrv6lsOAtHDsCBLjiuk</value>
- </property>
-
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/twitter/src/test/resources/log4j.properties b/demos/twitter/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/twitter/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/pom.xml b/demos/uniquecount/pom.xml
deleted file mode 100644
index 7b402fc..0000000
--- a/demos/uniquecount/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>uniquecount</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Apex Malhar Unique Count Demo</name>
- <description></description>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-demos</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <properties>
- <skipTests>true</skipTests>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.1</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/assemble/appPackage.xml b/demos/uniquecount/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/uniquecount/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
deleted file mode 100644
index 57ef1a1..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.stream.Counter;
-import com.datatorrent.lib.stream.StreamDuplicater;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * Application to demonstrate PartitionableUniqueCount operator. <br>
- * The input operator generate random keys, which is sent to
- * PartitionableUniqueCount operator initially partitioned into three partitions to
- * test unifier functionality, and output of the operator is sent to verifier to verify
- * that it generates correct result.
- *
- * @since 1.0.2
- */
-@ApplicationAnnotation(name = "UniqueValueCountDemo")
-public class Application implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration entries)
- {
- /* Generate random key-value pairs */
- RandomKeysGenerator randGen = dag.addOperator("randomgen", new RandomKeysGenerator());
-
-
- /* Initialize with three partition to start with */
- // UniqueCount1 uniqCount = dag.addOperator("uniqevalue", new UniqueCount1());
- UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>());
-
- MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
-
- uniqCount.setCumulative(false);
- dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3));
-
- CountVerifier<Integer> verifier = dag.addOperator("verifier", new CountVerifier<Integer>());
- StreamDuplicater<KeyHashValPair<Integer, Integer>> dup = dag.addOperator("dup", new StreamDuplicater<KeyHashValPair<Integer, Integer>>());
- ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
-
- ConsoleOutputOperator successOutput = dag.addOperator("successoutput", new ConsoleOutputOperator());
- successOutput.setStringFormat("Success %d");
- ConsoleOutputOperator failureOutput = dag.addOperator("failureoutput", new ConsoleOutputOperator());
- failureOutput.setStringFormat("Failure %d");
-
- // success and failure counters.
- Counter successcounter = dag.addOperator("successcounter", new Counter());
- Counter failurecounter = dag.addOperator("failurecounter", new Counter());
-
- dag.addStream("datain", randGen.outPort, uniqCount.data);
- dag.addStream("dataverification0", randGen.verificationPort, verifier.in1);
- dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
- dag.addStream("split", converter.output, dup.data);
- dag.addStream("consoutput", dup.out1, output.input);
- dag.addStream("dataverification1", dup.out2, verifier.in2);
- dag.addStream("successc", verifier.successPort, successcounter.input);
- dag.addStream("failurec", verifier.failurePort, failurecounter.input);
- dag.addStream("succconsoutput", successcounter.output, successOutput.input);
- dag.addStream("failconsoutput", failurecounter.output, failureOutput.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
deleted file mode 100644
index d201037..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/*
-Compare results and print non-matching values to console.
- */
-/**
- * <p>CountVerifier class.</p>
- *
- * @since 1.0.2
- */
-public class CountVerifier<K> implements Operator
-{
- HashMap<K, Integer> map1 = new HashMap<K, Integer>();
- HashMap<K, Integer> map2 = new HashMap<K, Integer>();
-
- public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
- {
- @Override
- public void process(KeyHashValPair<K, Integer> tuple)
- {
- processTuple(tuple, map1);
- }
- };
-
- public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>()
- {
- @Override
- public void process(KeyHashValPair<K, Integer> tuple)
- {
- processTuple(tuple, map2);
- }
- };
-
- void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map)
- {
- map.put(tuple.getKey(), tuple.getValue());
- }
-
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>();
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>();
-
- @Override
- public void beginWindow(long l)
- {
-
- }
-
- @Override
- public void endWindow()
- {
- int failureCount = 0;
- for (Map.Entry<K, Integer> e : map1.entrySet()) {
- K key = e.getKey();
- int val = map2.get(key);
- if (val != e.getValue()) {
- failureCount++;
- }
- }
- if (failureCount != 0) {
- failurePort.emit(failureCount);
- } else {
- successPort.emit(map1.size());
- }
- }
-
- @Override
- public void setup(Context.OperatorContext operatorContext)
- {
-
- }
-
- @Override
- public void teardown()
- {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
deleted file mode 100644
index e806759..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import java.util.HashMap;
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Generate random Key value pairs.
- * key is string and value is int, it emits the pair as KeyValPair on outPort,
- *
- * @since 1.0.2
- */
-public class RandomDataGenerator implements InputOperator
-{
- public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>();
- private HashMap<String, Integer> dataInfo;
- private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class);
- private int count;
- private int sleepMs = 10;
- private int keyRange = 100;
- private int valueRange = 100;
- private long tupleBlast = 10000;
- private Random random;
-
- public RandomDataGenerator()
- {
- random = new Random();
- }
-
- @Override
- public void emitTuples()
- {
- for (int i = 0; i < tupleBlast; i++) {
- String key = String.valueOf(random.nextInt(keyRange));
- int val = random.nextInt(valueRange);
- outPort.emit(new KeyValPair<String, Object>(key, val));
- }
- try {
- Thread.sleep(sleepMs);
- } catch (Exception ex) {
- LOG.error(ex.getMessage());
- }
- count++;
- }
-
- public int getSleepMs()
- {
- return sleepMs;
- }
-
- public void setSleepMs(int sleepMs)
- {
- this.sleepMs = sleepMs;
- }
-
- public long getTupleBlast()
- {
- return tupleBlast;
- }
-
- public void setTupleBlast(long tupleBlast)
- {
- this.tupleBlast = tupleBlast;
- }
-
- @Override
- public void beginWindow(long l)
- {
-
- }
-
- @Override
- public void endWindow()
- {
- LOG.debug("emitTuples called " + count + " times in this window");
- count = 0;
- }
-
- @Override
- public void setup(Context.OperatorContext operatorContext)
- {
-
- }
-
- @Override
- public void teardown()
- {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
deleted file mode 100644
index 28f3bc0..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Input port operator for generating random values on keys. <br>
- * Key(s) : key + integer in range between 0 and numKeys <br>
- * Value(s) : integer in range of 0 to numValuesPerKeys <br>
- *
- * @since 0.9.3
- */
-public class RandomKeyValues implements InputOperator
-{
- public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>();
- private Random random = new Random(11111);
- private int numKeys;
- private int numValuesPerKeys;
- private int tuppleBlast = 1000;
- private int emitDelay = 20; /* 20 ms */
-
- /* For verification */
- private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
-
- public RandomKeyValues()
- {
- this.numKeys = 100;
- this.numValuesPerKeys = 100;
- }
-
- public RandomKeyValues(int keys, int values)
- {
- this.numKeys = keys;
- this.numValuesPerKeys = values;
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- }
-
- @Override
- public void teardown()
- {
- }
-
- @Override
- public void emitTuples()
- {
- /* generate tuples randomly, */
- for (int i = 0; i < tuppleBlast; i++) {
- int intKey = random.nextInt(numKeys);
- String key = "key" + String.valueOf(intKey);
- int value = random.nextInt(numValuesPerKeys);
-
- // update history for verifying later.
- BitSet bmap = history.get(intKey);
- if (bmap == null) {
- bmap = new BitSet();
- history.put(intKey, bmap);
- }
- bmap.set(value);
-
- // emit the key with value.
- outport.emit(new KeyValPair<String, Object>(key, value));
- }
- try {
- Thread.sleep(emitDelay);
- } catch (Exception e) {
- // Ignore.
- }
- }
-
- public int getNumKeys()
- {
- return numKeys;
- }
-
- public void setNumKeys(int numKeys)
- {
- this.numKeys = numKeys;
- }
-
- public int getNumValuesPerKeys()
- {
- return numValuesPerKeys;
- }
-
- public void setNumValuesPerKeys(int numValuesPerKeys)
- {
- this.numValuesPerKeys = numValuesPerKeys;
- }
-
- public int getTuppleBlast()
- {
- return tuppleBlast;
- }
-
- public void setTuppleBlast(int tuppleBlast)
- {
- this.tuppleBlast = tuppleBlast;
- }
-
- public int getEmitDelay()
- {
- return emitDelay;
- }
-
- public void setEmitDelay(int emitDelay)
- {
- this.emitDelay = emitDelay;
- }
-
- public void debug()
- {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
deleted file mode 100644
index eb9d22c..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/*
- Generate random keys.
- */
-/**
- * <p>RandomKeysGenerator class.</p>
- *
- * @since 1.0.2
- */
-public class RandomKeysGenerator implements InputOperator
-{
-
- protected int numKeys = 100;
- protected int tupleBlast = 15000;
- protected long sleepTime = 0;
- protected Map<Integer, MutableInt> history = new HashMap<Integer, MutableInt>();
- private Random random = new Random();
- private Date date = new Date();
- private long start;
-
- @OutputPortFieldAnnotation(optional = false)
- public transient DefaultOutputPort<Integer> outPort = new DefaultOutputPort<Integer>();
-
- @OutputPortFieldAnnotation(optional = true)
- public transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> verificationPort =
- new DefaultOutputPort<KeyHashValPair<Integer, Integer>>();
-
- @Override
- public void emitTuples()
- {
- for (int i = 0; i < tupleBlast; i++) {
- int key = random.nextInt(numKeys);
- outPort.emit(key);
-
-
- if (verificationPort.isConnected()) {
- // maintain history for later verification.
- MutableInt count = history.get(key);
- if (count == null) {
- count = new MutableInt(0);
- history.put(key, count);
- }
- count.increment();
- }
-
- }
- try {
- if (sleepTime != 0) {
- Thread.sleep(sleepTime);
- }
- } catch (Exception ex) {
- // Ignore.
- }
- }
-
- public RandomKeysGenerator()
- {
- start = date.getTime();
- }
-
- @Override
- public void beginWindow(long l)
- {
-
- }
-
- @Override
- public void endWindow()
- {
-
- if (verificationPort.isConnected()) {
- for (Map.Entry<Integer, MutableInt> e : history.entrySet()) {
- verificationPort.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), e.getValue().toInteger()));
- }
- history.clear();
- }
-
- }
-
- @Override
- public void setup(Context.OperatorContext operatorContext)
- {
-
- }
-
- @Override
- public void teardown()
- {
-
- }
-
- public int getNumKeys()
- {
- return numKeys;
- }
-
- public void setNumKeys(int numKeys)
- {
- this.numKeys = numKeys;
- }
-
- public int getTupleBlast()
- {
- return tupleBlast;
- }
-
- public void setTupleBlast(int tupleBlast)
- {
- this.tupleBlast = tupleBlast;
- }
-
- public long getSleepTime()
- {
- return sleepTime;
- }
-
- public void setSleepTime(long sleepTime)
- {
- this.sleepTime = sleepTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
deleted file mode 100644
index eb9e392..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * <p>UniqueKeyValCountDemo class.</p>
- *
- * @since 1.0.2
- */
-@ApplicationAnnotation(name = "UniqueKeyValueCountDemo")
-public class UniqueKeyValCountDemo implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration entries)
- {
- /* Generate random key-value pairs */
- RandomDataGenerator randGen = dag.addOperator("randomgen", new RandomDataGenerator());
-
- /* Initialize with three partition to start with */
- UniqueCounter<KeyValPair<String, Object>> uniqCount =
- dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, Object>>());
- MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
- uniqCount.setCumulative(false);
- dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3));
-
- ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
-
- dag.addStream("datain", randGen.outPort, uniqCount.data);
- dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
- dag.addStream("consoutput", converter.output, output.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java
deleted file mode 100644
index 6f81c0d..0000000
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- Demo Application for new Paritionable UniqueCount Operator.
- */
-package com.datatorrent.demos.uniquecount;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/main/resources/META-INF/properties.xml b/demos/uniquecount/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 8742328..0000000
--- a/demos/uniquecount/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!--
- <property>
- <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
- <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
- </property>
- -->
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/site/conf/my-app-conf1.xml b/demos/uniquecount/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/uniquecount/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
deleted file mode 100644
index 66a0af1..0000000
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class ApplicationTest
-{
- @Test
- public void testApplication() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- new Application().populateDAG(lma.getDAG(), new Configuration(false));
- LocalMode.Controller lc = lma.getController();
- lc.run(10000);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
deleted file mode 100644
index a198247..0000000
--- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.uniquecount;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- * Test the DAG declaration in local mode.
- */
-public class UniqueKeyValDemoTest
-{
- @Test
- public void testApplication() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- new UniqueKeyValCountDemo().populateDAG(lma.getDAG(), new Configuration(false));
- LocalMode.Controller lc = lma.getController();
- lc.run(10000);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/uniquecount/src/test/resources/log4j.properties b/demos/uniquecount/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/uniquecount/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml
deleted file mode 100644
index 410daea..0000000
--- a/demos/wordcount/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>wordcount-demo</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Apex Malhar Wordcount Demo</name>
- <description>A very simple application that demonstrates Apex Platform\u2019s streaming window feature.</description>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-demos</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <properties>
- <skipTests>true</skipTests>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- <version>6.6.4</version>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/assemble/appPackage.xml b/demos/wordcount/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/wordcount/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
deleted file mode 100644
index d0512cf..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-/**
- * Simple Word Count Demo : <br>
- * This is application to count total occurrence of each word from file or any
- * stream. <br>
- * <br>
- *
- * Functional Description : <br>
- * This demo declares custom input operator to read data file set by user. <br>
- * This input operator can be replaced by any stream input operator. <br>
- * <br>
- *
- * Custom Attribute(s) : None <br>
- * <br>
- *
- * Input Adapter : <br>
- * Word input operator opens user specified data file and streams each line to
- * application. <br>
- * <br>
- *
- * Output Adapter : <br>
- * Output values are written to console through ConsoleOutputOerator<br>
- * If needed you can use other output adapters<br>
- * <br>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see following output on console: <br>
- * </pre>
- * {developed=1} {bush\u2019s=2} {roster=1} {council=1} {mankiw=1} {academia=1}
- * {of=6} {help=1} {are=1} {presidential=1}
- * </pre> <br> <br>
- *
- * Scaling Options : <br>
- * This operator app can not be scaled, please look at implementation {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/UniqueWordCounter.jpg" width=600px > <br>
- *
- * Streaming Window Size : 500ms
- * Operator Details : <br>
- * <ul>
- * <li>
- * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
- * This can replaced by any input stream by user. <br>
- * Class : {@link com.datatorrent.demos.wordcount.WordCountInputOperator} <br>
- * Operator Application Window Count : 1 <br>
- * StateFull : No
- * </li>
- * <li>
- * <p><b> The operator count : </b> This operator aggregates unique key count over one window count(app). <br>
- * Class : {@link com.datatorrent.lib.algo.UniqueCounterEach} <br>
- * Operator Application Window Count : 1 <br>
- * StateFull : No
- * </li>
- * <li>
- * <p><b>The operator Console: </b> This operator just outputs the input tuples to the console (or stdout).
- * This case it emits unique count of each word over 500ms.
- * </li>
- * </ul>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "WordCountDemo")
-public class Application implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator());
- UniqueCounter<String> wordCount = dag.addOperator("count", new UniqueCounter<String>());
- dag.addStream("wordinput-count", input.outputPort, wordCount.data);
- ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
- dag.addStream("count-console",wordCount.count, consoleOperator.input);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
deleted file mode 100644
index 7e5bb93..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.net.URI;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-
-/**
- * Simple demo that computes word frequencies from any file dropped into a
- * monitored directory. It outputs the top N word-frequency pairs for each file
- * as well globally across all files.
- * <p>
- * Each input file generates a corresponding output file in the output directory
- * containing the top N pairs for that file. The output is also written
- * to an internal store to support visualization in the UI via queries.
- * <p>
- * @since 3.2.0
- */
-@ApplicationAnnotation(name = "TopNWordsWithQueries")
-public class ApplicationWithQuerySupport implements StreamingApplication
-{
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
-
- /**
- * Name of schema file.
- */
- public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json";
-
- /**
- * Populates the DAG with operators and connecting streams
- *
- * @param dag The directed acyclic graph of operators to populate
- * @param conf The configuration
- */
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- // create operators
- LineReader lineReader = dag.addOperator("lineReader", new LineReader());
- WordReader wordReader = dag.addOperator("wordReader", new WordReader());
- WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
- FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
- WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
- ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
- console.setStringFormat("wordCount: %s");
-
- // create streams
-
- dag.addStream("lines", lineReader.output, wordReader.input);
- dag.addStream("control", lineReader.control, fileWordCount.control);
- dag.addStream("words", wordReader.output, windowWordCount.input);
- dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
- dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
-
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
- if (!StringUtils.isEmpty(gatewayAddress)) { // add query support
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-
- AppDataSnapshotServerMap snapshotServerFile
- = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap());
- AppDataSnapshotServerMap snapshotServerGlobal
- = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap());
-
- String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
- snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON);
- snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON);
-
- PubSubWebSocketAppDataQuery wsQueryFile = new PubSubWebSocketAppDataQuery();
- PubSubWebSocketAppDataQuery wsQueryGlobal = new PubSubWebSocketAppDataQuery();
- wsQueryFile.setUri(uri);
- wsQueryGlobal.setUri(uri);
-
- snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile);
- snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal);
-
- PubSubWebSocketAppDataResult wsResultFile
- = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult());
- PubSubWebSocketAppDataResult wsResultGlobal
- = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult());
- wsResultFile.setUri(uri);
- wsResultGlobal.setUri(uri);
-
- Operator.InputPort<String> queryResultFilePort = wsResultFile.input;
- Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input;
-
- dag.addStream("WordCountsFile", fileWordCount.outputPerFile, snapshotServerFile.input, console.input);
- dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, snapshotServerGlobal.input);
-
- dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort);
- dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort);
- } else {
- //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
- dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input);
- }
-
- LOG.info("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled());
- LOG.info("Returning from populateDAG");
- }
-
-}