You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/08/29 03:10:51 UTC

[01/25] incubator-apex-malhar git commit: MLHR-1796 - #resolve Fixed compilation errors in benchmark application.

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/feature-AppData 7e89a86f4 -> 59622483e (forced update)


MLHR-1796 - #resolve Fixed compilation errors in benchmark application.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a980e061
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a980e061
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a980e061

Branch: refs/heads/feature-AppData
Commit: a980e061dffb14b02d4426e59f5e3cdeaa0e2b24
Parents: 0be7372
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Aug 4 20:54:24 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 4 20:54:24 2015 -0700

----------------------------------------------------------------------
 .../algo/UniqueValueCountBenchmarkApplication.java          | 9 +++++----
 .../benchmark/script/RubyOperatorBenchmarkApplication.java  | 2 +-
 .../benchmark/memsql/MemsqlInputBenchmarkTest.java          | 2 +-
 .../datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java | 6 +++---
 4 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index 1109a85..2f25130 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -19,10 +19,11 @@ package com.datatorrent.benchmark.algo;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.algo.UniqueCounterValue;
 import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+import com.datatorrent.lib.stream.Counter;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 import com.datatorrent.api.Context;
@@ -65,12 +66,12 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
     dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
     uniqCount.setCumulative(false);
 
-    UniqueCounterValue counter = dag.addOperator("count", new UniqueCounterValue());
+    Counter counter = dag.addOperator("count", new Counter());
     ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
 
     dag.addStream("datain", randGen.integer_data, uniqCount.data);
     dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
-    dag.addStream("consoutput", converter.output, counter.data);
-    dag.addStream("final", counter.count, output.input);
+    dag.addStream("consoutput", converter.output, counter.input);
+    dag.addStream("final", counter.output, output.input);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index db998b2..c7e09b4 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -22,7 +22,7 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.benchmark.RandomMapOutput;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.script.RubyOperator;
+import com.datatorrent.contrib.ruby.RubyOperator;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
 
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
index 580e3da..abc86a1 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
@@ -53,7 +53,7 @@ public class MemsqlInputBenchmarkTest
 
     AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
 
-    MemsqlOutputOperator outputOperator = new MemsqlOutputOperator();
+    MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator();
     outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
     outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
     outputOperator.setBatchSize(BATCH_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
index 9195a45..04c0cb0 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
@@ -19,7 +19,7 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.memsql.MemsqlOutputOperator;
+import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -86,8 +86,8 @@ public class MemsqlOutputBenchmark implements StreamingApplication
     randomEventGenerator.setTuplesBlast(TUPLE_BLAST);
 
     LOG.debug("Before making output operator");
-    MemsqlOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
-                                                                new MemsqlOutputOperator());
+    MemsqlPOJOOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
+                                                                new MemsqlPOJOOutputOperator());
     LOG.debug("After making output operator");
 
     memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE);


[13/25] incubator-apex-malhar git commit: Merge pull request #1534 from ishark/redisStore

Posted by da...@apache.org.
Merge pull request #1534 from ishark/redisStore

MLHR-1748 #resolve #comment Created concrete input and output operators for Redis Store

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/717168bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/717168bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/717168bc

Branch: refs/heads/feature-AppData
Commit: 717168bc8768d4376b5f724fb7a31c975878aea5
Parents: 0b3bb88 ada42ab
Author: Chandni Singh <si...@gmail.com>
Authored: Fri Aug 14 15:17:27 2015 -0700
Committer: Chandni Singh <si...@gmail.com>
Committed: Fri Aug 14 15:17:27 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   8 +-
 .../redis/AbstractRedisInputOperator.java       | 225 +++++++++++++++++-
 .../redis/RedisKeyValueInputOperator.java       |  55 +++++
 .../redis/RedisMapAsValueInputOperator.java     |  45 ++++
 .../contrib/redis/RedisPOJOInputOperator.java   | 204 ++++++++++++++++
 .../contrib/redis/RedisPOJOOutputOperator.java  | 155 +++++++++++++
 .../datatorrent/contrib/redis/RedisStore.java   |  27 +++
 .../contrib/redis/RedisInputOperatorTest.java   | 193 ++++++++++++++++
 .../contrib/redis/RedisPOJOOperatorTest.java    | 230 +++++++++++++++++++
 demos/machinedata/pom.xml                       |   2 +-
 10 files changed, 1138 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[17/25] incubator-apex-malhar git commit: update japicmp plugin version

Posted by da...@apache.org.
update japicmp plugin version


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c4a6d8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c4a6d8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c4a6d8d7

Branch: refs/heads/feature-AppData
Commit: c4a6d8d75538bf623eaf803a42c27044ebbfd6da
Parents: e0ee8ab
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 22:45:39 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 22:45:39 2015 -0700

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c4a6d8d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba86445..ccdc00a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
           <plugin>
             <groupId>com.github.siom79.japicmp</groupId>
             <artifactId>japicmp-maven-plugin</artifactId>
-            <version>0.5.1</version>
+            <version>0.5.3</version>
             <configuration>
               <oldVersion>
                 <dependency>


[03/25] incubator-apex-malhar git commit: Removed references to name property in BaseOperator

Posted by da...@apache.org.
Removed references to name property in BaseOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a0280691
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a0280691
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a0280691

Branch: refs/heads/feature-AppData
Commit: a0280691b82f54dd6e33f7cc01772a94373aa1da
Parents: 0a4250e
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 14:17:02 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Aug 6 15:20:44 2015 -0700

----------------------------------------------------------------------
 .../test/java/com/datatorrent/demos/mobile/ApplicationTest.java | 2 --
 .../com/datatorrent/lib/io/SimpleSinglePortInputOperator.java   | 3 ++-
 .../java/com/datatorrent/lib/io/WebSocketInputOperator.java     | 3 ++-
 .../java/com/datatorrent/lib/io/WebSocketOutputOperator.java    | 5 ++---
 .../com/datatorrent/lib/multiwindow/SortedMovingWindow.java     | 3 ++-
 .../com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java | 1 -
 .../java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java | 1 -
 .../com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java     | 2 --
 8 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
index d58e8ff..3494417 100644
--- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
+++ b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
@@ -67,12 +67,10 @@ public class ApplicationTest
     URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
-    outputOperator.setName("testOutputOperator");
     outputOperator.setUri(uri);
     outputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.QueryLocation.topic"));
 
     PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>();
-    inputOperator.setName("testInputOperator");
     inputOperator.setUri(uri);
     inputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.LocationResults.topic"));
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
index 07bcaf5..1fbd45f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.common.util.BaseOperator;
+import org.apache.commons.lang3.ClassUtils;
 
 /**
  * This an input operator which passes data from an asynchronous data source to a port processing thread.
@@ -60,7 +61,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl
   {
     isActive = true;
     if (this instanceof Runnable) {
-      ioThread = new Thread((Runnable)this, "io-" + this.getName());
+      ioThread = new Thread((Runnable)this, "io-" + ClassUtils.getShortClassName(this.getClass()));
       ioThread.start();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index a8cfa6e..69ebfa3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.ClassUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -178,7 +179,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
         public Thread newThread(Runnable r)
         {
           Thread t = new Thread(r);
-          t.setName(WebSocketInputOperator.this.getName() + "-AsyncHttpClient-" + count++);
+          t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + count++);
           return t;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index a7ab3bd..f46ccb8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -19,8 +19,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.*;
 
-import javax.validation.constraints.NotNull;
-
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfigBean;
 import com.ning.http.client.websocket.WebSocket;
@@ -32,6 +30,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.ClassUtils;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -193,7 +192,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator
       public Thread newThread(Runnable r)
       {
         Thread t = new Thread(r);
-        t.setName(WebSocketOutputOperator.this.getName() + "-AsyncHttpClient-" + count++);
+        t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + count++);
         return t;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
index 84388e4..df4d482 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
@@ -28,6 +28,7 @@ import javax.validation.constraints.NotNull;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.google.common.base.Function;
+import org.apache.commons.lang.ClassUtils;
 
 /**
  *
@@ -114,7 +115,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
             k = ((Comparable<T>) expiredTuple).compareTo(minElemInSortedList);
           } else {
             errorOutput.emit(expiredTuple);
-            throw new IllegalArgumentException("Operator \"" + getName() + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
+            throw new IllegalArgumentException("Operator \"" + ClassUtils.getShortClassName(this.getClass()) + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
           }
         } else {
           k = comparator.compare(expiredTuple, minElemInSortedList);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
index 3adf90b..a3f5dc0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
@@ -92,7 +92,6 @@ public class HttpJsonChunksInputOperatorTest
     CollectorTestSink sink = new CollectorTestSink();
 
     operator.outputPort.setSink(sink);
-    operator.setName("testHttpInputNode");
     operator.setUrl(new URI(url));
 
     operator.setup(null);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
index 538b6b4..d8b3778 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
@@ -80,7 +80,6 @@ public class HttpLinesInputOperatorTest
 
     final HttpLinesInputOperator operator = new HttpLinesInputOperator();
     CollectorTestSink<String> sink = TestUtils.setSink(operator.outputPort, new CollectorTestSink<String>());
-    operator.setName("testHttpInputNode");
     operator.setUrl(new URI(url));
 
     operator.setup(null);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index 4bfcf45..778524b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -51,12 +51,10 @@ public class PubSubWebSocketOperatorTest
     URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
-    outputOperator.setName("testOutputOperator");
     outputOperator.setUri(uri);
     outputOperator.setTopic("testTopic");
 
     PubSubWebSocketInputOperator<Object> inputOperator = new PubSubWebSocketInputOperator<Object>();
-    inputOperator.setName("testInputOperator");
     inputOperator.setUri(uri);
     inputOperator.setTopic("testTopic");
 


[09/25] incubator-apex-malhar git commit: Added default NOOP instance for idempotency manager

Posted by da...@apache.org.
Added default NOOP instance for idempotency manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4dc4788f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4dc4788f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4dc4788f

Branch: refs/heads/feature-AppData
Commit: 4dc4788f74178509eb01cc1d4402522601095dcd
Parents: 13a3fbe
Author: ishark <is...@datatorrent.com>
Authored: Tue Aug 11 15:10:01 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Tue Aug 11 15:10:01 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                                | 6 ------
 .../contrib/rabbitmq/AbstractRabbitMQInputOperator.java        | 1 +
 2 files changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4dc4788f/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 76e8144..9776e2f 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,11 +565,5 @@
       <version>${dt.framework.version}</version>
       <type>jar</type>
     </dependency>
-    <dependency>
-      <groupId>com.datatorrent</groupId>
-      <artifactId>dt-engine</artifactId>
-      <version>${dt.framework.version}</version>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4dc4788f/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index e408f5e..955a2c8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -111,6 +111,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements
     currentWindowRecoveryState = new HashMap<Long, byte[]>();
     pendingAck = new HashSet<Long>();
     recoveredTags = new HashSet<Long>();
+    idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager();
   }
 
   


[20/25] incubator-apex-malhar git commit: Fix malhar-contrib dependency version

Posted by da...@apache.org.
Fix malhar-contrib dependency version


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6155673e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6155673e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6155673e

Branch: refs/heads/feature-AppData
Commit: 6155673e5518d24f0459a29b00b6eb05f04556bc
Parents: e7a475c
Author: thomas <th...@datatorrent.com>
Authored: Sat Aug 22 00:01:46 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Sat Aug 22 00:01:46 2015 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml | 2 +-
 apps/pom.xml           | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6155673e/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 9643daf..3359efc 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -53,7 +53,7 @@
     <dependency>
       <groupId>com.datatorrent</groupId>
       <artifactId>malhar-contrib</artifactId>
-      <version>${datatorrent.version}</version>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.janino</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6155673e/apps/pom.xml
----------------------------------------------------------------------
diff --git a/apps/pom.xml b/apps/pom.xml
index b984069..34b2376 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -18,7 +18,6 @@
 
   <properties>
     <!-- change this if you desire to use a different version of DataTorrent -->
-    <datatorrent.version>3.0.0</datatorrent.version>
     <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
     <maven.deploy.skip>true</maven.deploy.skip>
     <maven.install.skip>true</maven.install.skip>


[02/25] incubator-apex-malhar git commit: Fix project version.

Posted by da...@apache.org.
Fix project version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a4250e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a4250e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a4250e0

Branch: refs/heads/feature-AppData
Commit: 0a4250e0bda347562784d54f7c1b6add0685ac37
Parents: a980e06
Author: thomas <th...@datatorrent.com>
Authored: Tue Aug 4 21:26:40 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Tue Aug 4 21:26:40 2015 -0700

----------------------------------------------------------------------
 benchmark/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a4250e0/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index fa6a975..0d07203 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <artifactId>malhar</artifactId>
     <groupId>com.datatorrent</groupId>
-    <version>3.0.0</version>
+    <version>3.1.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.datatorrent</groupId>


[06/25] incubator-apex-malhar git commit: enable server plugin

Posted by da...@apache.org.
enable server plugin


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2e5813d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2e5813d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2e5813d4

Branch: refs/heads/feature-AppData
Commit: 2e5813d4a395f8f3d85ffd7f518bdf7672c3b9cc
Parents: 93ce29c
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Aug 10 11:53:27 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Aug 10 11:56:29 2015 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml |  1 +
 demos/pom.xml          |  1 +
 pom.xml                | 52 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 7321baa..ce0f4ce 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -15,6 +15,7 @@
   <properties>
     <maven.deploy.skip>false</maven.deploy.skip>
     <skipTests>false</skipTests>
+    <semver.plugin.skip>true</semver.plugin.skip>
   </properties>
 
   <name>Logstream Application</name>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 64c6c4a..a5601a5 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -30,6 +30,7 @@
   <properties>
     <datatorrent.version>3.0.0</datatorrent.version>
     <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
+    <semver.plugin.skip>true</semver.plugin.skip>
   </properties>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 81a699b..ba86445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,8 +43,60 @@
     <jackson.version>1.9.2</jackson.version>
     <jersey.version>1.9</jersey.version>
     <jetty.version>8.1.10.v20130312</jetty.version>
+    <semver.plugin.skip>false</semver.plugin.skip>
   </properties>
 
+  <profiles>
+    <profile>
+      <activation>
+        <file>
+          <exists>src</exists>
+        </file>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>com.github.siom79.japicmp</groupId>
+            <artifactId>japicmp-maven-plugin</artifactId>
+            <version>0.5.1</version>
+            <configuration>
+              <oldVersion>
+                <dependency>
+                  <groupId>com.datatorrent</groupId>
+                  <artifactId>${project.artifactId}</artifactId>
+                  <version>3.0.0</version>
+                </dependency>
+              </oldVersion>
+              <newVersion>
+                <file>
+                  <path>${project.build.directory}/${project.artifactId}-${project.version}.jar</path>
+                </file>
+              </newVersion>
+              <parameter>
+                <onlyModified>true</onlyModified>
+                <accessModifier>protected</accessModifier>
+                <breakBuildOnModifications>false</breakBuildOnModifications>
+                <breakBuildOnBinaryIncompatibleModifications>true</breakBuildOnBinaryIncompatibleModifications>
+                <onlyBinaryIncompatible>false</onlyBinaryIncompatible>
+                <includeSynthetic>false</includeSynthetic>
+                <ignoreMissingClasses>true</ignoreMissingClasses>
+              </parameter>
+              <skip>${semver.plugin.skip}</skip>
+            </configuration>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>cmp</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <modules>
     <module>library</module>
     <module>contrib</module>


[19/25] incubator-apex-malhar git commit: Created new release branch. Updating version to 3.2.0-SNAPSHOT

Posted by da...@apache.org.
Created new release branch. Updating version to 3.2.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e7a475c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e7a475c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e7a475c1

Branch: refs/heads/feature-AppData
Commit: e7a475c11f3b2809de1be859c142738d67dda39f
Parents: 3e07843
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 23:55:05 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 23:55:05 2015 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml     | 2 +-
 apps/pom.xml               | 2 +-
 benchmark/pom.xml          | 2 +-
 contrib/pom.xml            | 2 +-
 demos/echoserver/pom.xml   | 2 +-
 demos/frauddetect/pom.xml  | 4 ++--
 demos/machinedata/pom.xml  | 4 ++--
 demos/mobile/pom.xml       | 4 ++--
 demos/mrmonitor/pom.xml    | 4 ++--
 demos/mroperator/pom.xml   | 4 ++--
 demos/pi/pom.xml           | 4 ++--
 demos/pom.xml              | 2 +-
 demos/r/pom.xml            | 4 ++--
 demos/twitter/pom.xml      | 4 ++--
 demos/uniquecount/pom.xml  | 4 ++--
 demos/wordcount/pom.xml    | 4 ++--
 demos/yahoofinance/pom.xml | 4 ++--
 library/pom.xml            | 2 +-
 pom.xml                    | 2 +-
 samples/pom.xml            | 2 +-
 20 files changed, 31 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 67df24c..9643daf 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <artifactId>malhar-apps</artifactId>
     <groupId>com.datatorrent</groupId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.datatorrent</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/apps/pom.xml
----------------------------------------------------------------------
diff --git a/apps/pom.xml b/apps/pom.xml
index e816985..b984069 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar-apps</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 0d07203..a2a7de2 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <artifactId>malhar</artifactId>
     <groupId>com.datatorrent</groupId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.datatorrent</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 50d7234..ee3c8bc 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar-contrib</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/echoserver/pom.xml
----------------------------------------------------------------------
diff --git a/demos/echoserver/pom.xml b/demos/echoserver/pom.xml
index 3d29f7f..ab64811 100644
--- a/demos/echoserver/pom.xml
+++ b/demos/echoserver/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <artifactId>malhar-demos</artifactId>
     <groupId>com.datatorrent</groupId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
   
   <groupId>com.datatorrent</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/frauddetect/pom.xml
----------------------------------------------------------------------
diff --git a/demos/frauddetect/pom.xml b/demos/frauddetect/pom.xml
index 13c9fcd..1313311 100644
--- a/demos/frauddetect/pom.xml
+++ b/demos/frauddetect/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
   
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>frauddetect-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 3498d0d..e36d83f 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
   
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>machinedata-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mobile/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/pom.xml b/demos/mobile/pom.xml
index cd55a7b..6dfc7f6 100644
--- a/demos/mobile/pom.xml
+++ b/demos/mobile/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>mobile-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mrmonitor/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/pom.xml b/demos/mrmonitor/pom.xml
index 181343a..7e04f60 100644
--- a/demos/mrmonitor/pom.xml
+++ b/demos/mrmonitor/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>mrmonitor</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mroperator/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mroperator/pom.xml b/demos/mroperator/pom.xml
index 4e09392..abf9abe 100644
--- a/demos/mroperator/pom.xml
+++ b/demos/mroperator/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
   
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>mroperator</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/pi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pi/pom.xml b/demos/pi/pom.xml
index 5719b2c..33ee252 100644
--- a/demos/pi/pom.xml
+++ b/demos/pi/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>pi-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 7b3aa08..2e93e02 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar-demos</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/r/pom.xml
----------------------------------------------------------------------
diff --git a/demos/r/pom.xml b/demos/r/pom.xml
index 42c203c..3b036ee 100644
--- a/demos/r/pom.xml
+++ b/demos/r/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>r-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml
index 8a0bec9..09ce1e2 100644
--- a/demos/twitter/pom.xml
+++ b/demos/twitter/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>twitter-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/uniquecount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/pom.xml b/demos/uniquecount/pom.xml
index 8b7d3fc..a75c56e 100644
--- a/demos/uniquecount/pom.xml
+++ b/demos/uniquecount/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>uniquecount</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml
index 779b4f7..a3834ba 100644
--- a/demos/wordcount/pom.xml
+++ b/demos/wordcount/pom.xml
@@ -4,7 +4,7 @@
   
     
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>wordcount-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -14,7 +14,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/yahoofinance/pom.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml
index 3b4db0c..80bbb4a 100644
--- a/demos/yahoofinance/pom.xml
+++ b/demos/yahoofinance/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <artifactId>yahoo-finance-demo</artifactId>
   <packaging>jar</packaging>
 
@@ -13,7 +13,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar-demos</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 77b8506..defd214 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -6,7 +6,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar-library</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccdc00a..c95f524 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
   </parent>
 
   <artifactId>malhar</artifactId>
-  <version>3.1.0-SNAPSHOT</version>
+  <version>3.2.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Malhar Open Source</name>
   <url>https://www.datatorrent.com/</url>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/samples/pom.xml
----------------------------------------------------------------------
diff --git a/samples/pom.xml b/samples/pom.xml
index 7e8b274..f1c13ba 100644
--- a/samples/pom.xml
+++ b/samples/pom.xml
@@ -6,7 +6,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>malhar</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar-samples</artifactId>


[05/25] incubator-apex-malhar git commit: omitting type from FieldInfo

Posted by da...@apache.org.
omitting type from FieldInfo


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f6d85fea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f6d85fea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f6d85fea

Branch: refs/heads/feature-AppData
Commit: f6d85feaf573f682c7f1670904c9d7c073b50b37
Parents: f40ba34
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Fri Aug 7 14:54:25 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Fri Aug 7 17:09:20 2015 -0700

----------------------------------------------------------------------
 library/src/main/java/com/datatorrent/lib/util/FieldInfo.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f6d85fea/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
index a4e4923..5b41ace 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
@@ -100,6 +100,7 @@ public class FieldInfo
 
   /**
    * the Java type of the column
+   * @omitFromUI
    */
   public void setType(SupportType type)
   {


[04/25] incubator-apex-malhar git commit: Fixed Twitter demo for App Builder

Posted by da...@apache.org.
Fixed Twitter demo for App Builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f40ba346
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f40ba346
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f40ba346

Branch: refs/heads/feature-AppData
Commit: f40ba346ae660e1b89de1ebbb3f8e1f7ba31cad0
Parents: a028069
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Aug 6 16:07:03 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 6 22:14:05 2015 -0700

----------------------------------------------------------------------
 .../contrib/twitter/TwitterSampleInput.java     | 32 ++++++++++++++++++++
 demos/pom.xml                                   |  4 +--
 demos/twitter/pom.xml                           |  2 +-
 .../demos/twitter/WindowedTopCounter.java       |  5 +++
 4 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
index 916bea8..9daef7f 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
@@ -293,6 +293,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
   }
 
   /**
+   * @return the consumerKey
+   */
+  public String getConsumerKey()
+  {
+    return consumerKey;
+  }
+
+  /**
    * @param consumerKey the consumerKey to set
    */
   public void setConsumerKey(String consumerKey)
@@ -301,6 +309,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
   }
 
   /**
+   * @return the consumerSecret
+   */
+  public String getConsumerSecret()
+  {
+    return consumerSecret;
+  }
+
+  /**
    * @param consumerSecret the consumerSecret to set
    */
   public void setConsumerSecret(String consumerSecret)
@@ -309,6 +325,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
   }
 
   /**
+   * @return the accessToken
+   */
+  public String getAccessToken()
+  {
+    return accessToken;
+  }
+
+  /**
    * @param accessToken the accessToken to set
    */
   public void setAccessToken(String accessToken)
@@ -317,6 +341,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
   }
 
   /**
+   * @return the accessTokenSecret
+   */
+  public String getAccessTokenSecret()
+  {
+    return accessTokenSecret;
+  }
+
+  /**
    * @param accessTokenSecret the accessTokenSecret to set
    */
   public void setAccessTokenSecret(String accessTokenSecret)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 64c6c4a..c17120f 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -173,7 +173,7 @@
     <dependency>
       <groupId>com.datatorrent</groupId>
       <artifactId>malhar-library</artifactId>
-      <version>${datatorrent.version}</version>
+      <version>${project.version}</version>
       <exclusions>
         <exclusion>
           <groupId>*</groupId>
@@ -184,7 +184,7 @@
     <dependency>
       <groupId>com.datatorrent</groupId>
       <artifactId>malhar-library</artifactId>
-      <version>${datatorrent.version}</version>
+      <version>${project.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml
index c55ee79..8a0bec9 100644
--- a/demos/twitter/pom.xml
+++ b/demos/twitter/pom.xml
@@ -54,7 +54,7 @@
     <dependency>
       <groupId>com.datatorrent</groupId>
       <artifactId>malhar-contrib</artifactId>
-      <version>${datatorrent.version}</version>
+      <version>${project.version}</version>
       <exclusions>
         <exclusion>
           <groupId>*</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
index 2edf7fd..3354ed4 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
@@ -179,6 +179,11 @@ public class WindowedTopCounter<T> extends BaseOperator
     topCount = count;
   }
 
+  public int getTopCount()
+  {
+    return topCount;
+  }
+
   /**
    * @return the windows
    */


[24/25] incubator-apex-malhar git commit: Fix NPE if an embeddable query info provider is not set on the snapshot server.

Posted by da...@apache.org.
Fix NPE if an embeddable query info provider is not set on the snapshot server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b24e99f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b24e99f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b24e99f7

Branch: refs/heads/feature-AppData
Commit: b24e99f74bb09350eeac22b5b7de482f86a6ccd7
Parents: d19f746
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Aug 26 18:00:23 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700

----------------------------------------------------------------------
 .../lib/appdata/snapshot/AbstractAppDataSnapshotServer.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b24e99f7/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index a51908f..af89109 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -158,7 +158,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   final public void activate(OperatorContext ctx)
   {
-    embeddableQueryInfoProvider.activate(ctx);
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.activate(ctx);
+    }
   }
 
   @SuppressWarnings("unchecked")


[18/25] incubator-apex-malhar git commit: Remove redundant version

Posted by da...@apache.org.
Remove redundant version


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3e07843f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3e07843f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3e07843f

Branch: refs/heads/feature-AppData
Commit: 3e07843f8396a6f431410ab71eefcc1df9c175c0
Parents: c4a6d8d
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 23:49:16 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 23:49:16 2015 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml   | 1 -
 demos/echoserver/pom.xml | 1 -
 2 files changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3e07843f/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index ce0f4ce..67df24c 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -8,7 +8,6 @@
   </parent>
 
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
   <artifactId>logstream</artifactId>
   <packaging>jar</packaging>
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3e07843f/demos/echoserver/pom.xml
----------------------------------------------------------------------
diff --git a/demos/echoserver/pom.xml b/demos/echoserver/pom.xml
index ad20744..3d29f7f 100644
--- a/demos/echoserver/pom.xml
+++ b/demos/echoserver/pom.xml
@@ -8,7 +8,6 @@
   </parent>
   
   <groupId>com.datatorrent</groupId>
-  <version>3.1.0-SNAPSHOT</version>
   <artifactId>echoserver</artifactId>
   <packaging>jar</packaging>
 


[12/25] incubator-apex-malhar git commit: Removed checking all the window ids in idempotency storage before replay

Posted by da...@apache.org.
Removed checking all the window ids in idempotency storage before replay


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9

Branch: refs/heads/feature-AppData
Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c
Parents: a57a3d7
Author: ishark <is...@datatorrent.com>
Authored: Fri Aug 14 14:37:24 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 14:37:24 2015 -0700

----------------------------------------------------------------------
 .../redis/AbstractRedisInputOperator.java       | 23 ++++++++++----------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 7f79bd0..260fbf6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
  * @category Input
  * @tags redis, key value
  *
- * @param <T> The tuple type.
+ * @param <T>
+ *          The tuple type.
  * @since 0.9.3
  */
 public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
@@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
   private transient Integer backupOffset;
   private int scanCount;
   private transient boolean replay;
+  private transient boolean skipOffsetRecovery = true;
 
   @NotNull
   private IdempotentStorageManager idempotentStorageManager;
@@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
   private void replay(long windowId)
   {
     try {
-      if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+      // For first recovered window, offset is already part of recovery state.
+      // So skip reading from idempotency manager
+      if (!skipOffsetRecovery) {
         // Begin offset for this window is recovery offset stored for the last
         // window
         RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
         recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
       }
-
+      skipOffsetRecovery = false;
       RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
       recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
       if (recoveryState.scanOffsetAtBeginWindow != null) {
         scanOffset = recoveryState.scanOffsetAtBeginWindow;
       }
       replay = true;
+
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
   }
 
-  private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
-  {
-    long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
-    if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
-      return false;
-    }
-    return true ;
-  }
-
   private void scanKeysFromOffset()
   {
     if (!scanComplete) {
@@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
     scanComplete = false;
     scanParameters = new ScanParams();
     scanParameters.count(scanCount);
+    
     // For the 1st window after checkpoint, windowID - 1 would not have recovery
     // offset stored in idempotentStorageManager
     // But recoveryOffset is non-transient, so will be recovered with
     // checkPointing
+    // Offset recovery from idempotency storage can be skipped in this case
     scanOffset = recoveryState.scanOffsetAtBeginWindow;
+    skipOffsetRecovery = true;
   }
 
   @Override


[22/25] incubator-apex-malhar git commit: Added the ability to sort schemas returned in a schema result.

Posted by da...@apache.org.
Added the ability to sort schemas returned in a schema result.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d19f7463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d19f7463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d19f7463

Branch: refs/heads/feature-AppData
Commit: d19f7463a34ae216aab605c3d16b6bdfdfa78b29
Parents: 819e175
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Aug 13 11:10:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700

----------------------------------------------------------------------
 .../appdata/schemas/SchemaRegistryMultiple.java | 28 +++++++++++++++++---
 1 file changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d19f7463/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
index 9833e94..832cf0a 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
@@ -15,8 +15,11 @@
  */
 package com.datatorrent.lib.appdata.schemas;
 
+import com.google.common.base.Preconditions;
 import java.io.Serializable;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +38,7 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
    * The dimensional table which holds the mapping from schema keys to schemas.
    */
   private DimensionalTable<Schema> table;
+  private Comparator<Schema> schemaComparator;
 
   /**
    * Constructor for serialization.
@@ -53,20 +57,36 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
     table = new DimensionalTable<Schema>(schemaKeys);
   }
 
+  /**
+   * The names of all the schema keys for all schemas in this registry.
+   * @param schemaKeys The names of all the schema keys for all schemas in this registry.
+   * @param schemaComparator The comparator used to order the schemas returned in the {@link SchemaResult} produced
+   * by {@link SchemaRegistryMultiple#getSchemaResult(com.datatorrent.lib.appdata.schemas.SchemaQuery)}
+   */
+  public SchemaRegistryMultiple(List<String> schemaKeys,
+                                Comparator<Schema> schemaComparator)
+  {
+    this(schemaKeys);
+    this.schemaComparator = Preconditions.checkNotNull(schemaComparator);
+  }
+
   @Override
   public SchemaResult getSchemaResult(SchemaQuery schemaQuery)
   {
     Map<String, String> schemaKeys = schemaQuery.getSchemaKeys();
     List<Schema> data = null;
 
-    if(schemaKeys == null) {
+    if (schemaKeys == null) {
       data = table.getAllDataPoints();
-    }
-    else {
+    } else {
       data = table.getDataPoints(schemaKeys);
     }
 
-    if(data.isEmpty()) {
+    if (schemaComparator != null) {
+      Collections.sort(data, schemaComparator);
+    }
+
+    if (data.isEmpty()) {
       return null;
     }
 


[16/25] incubator-apex-malhar git commit: Merge branch 'ishark-fixVersionCompatibilityIssue' into v3.1.0

Posted by da...@apache.org.
Merge branch 'ishark-fixVersionCompatibilityIssue' into v3.1.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e0ee8ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e0ee8ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e0ee8ab6

Branch: refs/heads/feature-AppData
Commit: e0ee8ab62b2a0d99dbc79bd4726dc33d3dfed96f
Parents: 0b31fee 731b8bb
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Tue Aug 18 13:51:46 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Tue Aug 18 13:51:46 2015 -0700

----------------------------------------------------------------------
 .../contrib/redis/AbstractRedisInputOperator.java           | 7 +++++--
 .../contrib/redis/RedisKeyValueInputOperator.java           | 9 +++++++++
 .../contrib/redis/RedisMapAsValueInputOperator.java         | 8 ++++++++
 .../datatorrent/contrib/redis/RedisPOJOInputOperator.java   | 7 +++++++
 4 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[14/25] incubator-apex-malhar git commit: Merge pull request #1533 from 243826/semantic-versioning-minor

Posted by da...@apache.org.
Merge pull request #1533 from 243826/semantic-versioning-minor

enable server plugin

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0b31fee5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0b31fee5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0b31fee5

Branch: refs/heads/feature-AppData
Commit: 0b31fee52afdbe11f6100b3881481b1962bb164c
Parents: 717168b 2e5813d
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Aug 17 17:42:10 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Aug 17 17:42:10 2015 -0700

----------------------------------------------------------------------
 apps/logstream/pom.xml |  1 +
 demos/pom.xml          |  1 +
 pom.xml                | 52 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0b31fee5/demos/pom.xml
----------------------------------------------------------------------


[08/25] incubator-apex-malhar git commit: MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator

Posted by da...@apache.org.
MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/13a3fbea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/13a3fbea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/13a3fbea

Branch: refs/heads/feature-AppData
Commit: 13a3fbea74b7deaf674c5f42dd945ebd01e17f65
Parents: 8e94665
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 16:14:14 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Aug 10 19:00:55 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../rabbitmq/AbstractRabbitMQInputOperator.java | 141 +++++++++++++++++--
 .../AbstractRabbitMQOutputOperator.java         |  65 ++++++++-
 ...bstractSinglePortRabbitMQOutputOperator.java |   5 +-
 .../rabbitmq/RabbitMQInputOperatorTest.java     |  86 ++++++++---
 .../rabbitmq/RabbitMQOutputOperatorTest.java    |  28 ++--
 6 files changed, 283 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..76e8144 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,5 +565,11 @@
       <version>${dt.framework.version}</version>
       <type>jar</type>
     </dependency>
+    <dependency>
+      <groupId>com.datatorrent</groupId>
+      <artifactId>dt-engine</artifactId>
+      <version>${dt.framework.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 06b3b88..e408f5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -16,12 +16,22 @@
 package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.api.*;
-import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.rabbitmq.client.*;
+
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+
 import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,9 +71,9 @@ import org.slf4j.LoggerFactory;
  *
  * @since 0.3.2
  */
-public abstract class AbstractRabbitMQInputOperator<T>
-    implements InputOperator,
-Operator.ActivationListener<OperatorContext>
+public abstract class AbstractRabbitMQInputOperator<T> implements
+    InputOperator, Operator.ActivationListener<OperatorContext>,
+    Operator.CheckpointListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
   @NotNull
@@ -87,8 +97,23 @@ Operator.ActivationListener<OperatorContext>
   protected transient Channel channel;
   protected transient TracingConsumer tracingConsumer;
   protected transient String cTag;
-  protected transient ArrayBlockingQueue<byte[]> holdingBuffer;
+  
+  protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer;
+  private IdempotentStorageManager idempotentStorageManager;
+  protected final transient Map<Long, byte[]> currentWindowRecoveryState;
+  private transient final Set<Long> pendingAck;
+  private transient final Set<Long> recoveredTags;
+  private transient long currentWindowId;
+  private transient int operatorContextId;
+  
+  public AbstractRabbitMQInputOperator()
+  {
+    currentWindowRecoveryState = new HashMap<Long, byte[]>();
+    pendingAck = new HashSet<Long>();
+    recoveredTags = new HashSet<Long>();
+  }
 
+  
 /**
  * define a consumer which can asynchronously receive data,
  * and added to holdingBuffer
@@ -124,8 +149,19 @@ Operator.ActivationListener<OperatorContext>
     @Override
     public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
     {
-      holdingBuffer.add(body);
-//      logger.debug("Received Async message:" + new String(body)+" buffersize:"+holdingBuffer.size());
+      long tag = envelope.getDeliveryTag();
+      if(envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag)))
+      {
+        if(recoveredTags.contains(tag)) {
+          pendingAck.add(tag);
+        }
+        return;
+      }
+      
+      // Acknowledgements are sent at the end of the window after adding to idempotency manager
+      pendingAck.add(tag);
+      holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
+      logger.debug("Received Async message: {}  buffersize: {} ", new String(body), holdingBuffer.size());
     }
   }
 
@@ -137,7 +173,9 @@ Operator.ActivationListener<OperatorContext>
       ntuples = holdingBuffer.size();
     }
     for (int i = ntuples; i-- > 0;) {
-      emitTuple(holdingBuffer.poll());
+      KeyValPair<Long, byte[]> message =  holdingBuffer.poll();
+      currentWindowRecoveryState.put(message.getKey(), message.getValue());
+      emitTuple(message.getValue());
     }
   }
 
@@ -146,22 +184,72 @@ Operator.ActivationListener<OperatorContext>
   @Override
   public void beginWindow(long windowId)
   {
+    currentWindowId = windowId;
+    if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
   }
 
+  @SuppressWarnings("unchecked")
+  private void replay(long windowId) {      
+    Map<Long, byte[]> recoveredData;
+    try {
+      recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId);
+      if (recoveredData == null) {
+        return;
+      }
+      for (Entry<Long, byte[]>  recoveredEntry : recoveredData.entrySet()) {
+        recoveredTags.add(recoveredEntry.getKey());
+        emitTuple(recoveredEntry.getValue());
+      }
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  
   @Override
   public void endWindow()
   {
+    //No more messages can be consumed now. so we will call emit tuples once more
+    //so that any pending messages can be emitted.
+    KeyValPair<Long, byte[]> message;
+    while ((message = holdingBuffer.poll()) != null) {
+      currentWindowRecoveryState.put(message.getKey(), message.getValue());
+      emitTuple(message.getValue());      
+    }
+    
+    try {
+      this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    
+    currentWindowRecoveryState.clear();
+    
+    for (Long deliveryTag : pendingAck) {
+      try {
+        channel.basicAck(deliveryTag, false);
+      } catch (IOException e) {        
+        DTThrowable.rethrow(e);
+      }
+    }
+    
+    pendingAck.clear();
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    holdingBuffer = new ArrayBlockingQueue<byte[]>(bufferSize);
+    this.operatorContextId = context.getId();
+    holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize);
+    this.idempotentStorageManager.setup(context);
   }
 
   @Override
   public void teardown()
   {
+    this.idempotentStorageManager.teardown();
   }
 
   @Override
@@ -178,10 +266,12 @@ Operator.ActivationListener<OperatorContext>
       channel = connection.createChannel();
 
       channel.exchangeDeclare(exchange, exchangeType);
+      boolean resetQueueName = false;
       if (queueName == null){
         // unique queuename is generated
         // used in case of fanout exchange
         queueName = channel.queueDeclare().getQueue();
+        resetQueueName = true;
       } else {
         // user supplied name
         // used in case of direct exchange
@@ -193,7 +283,11 @@ Operator.ActivationListener<OperatorContext>
 //      consumer = new QueueingConsumer(channel);
 //      channel.basicConsume(queueName, true, consumer);
       tracingConsumer = new TracingConsumer(channel);
-      cTag = channel.basicConsume(queueName, true, tracingConsumer);
+      cTag = channel.basicConsume(queueName, false, tracingConsumer);
+      if(resetQueueName)
+      {
+        queueName = null;
+      }
     }
     catch (IOException ex) {
       throw new RuntimeException("Connection Failure", ex);
@@ -211,6 +305,23 @@ Operator.ActivationListener<OperatorContext>
       logger.debug(ex.toString());
     }
   }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      idempotentStorageManager.deleteUpTo(operatorContextId, windowId);
+    }
+    catch (IOException e) {
+      throw new RuntimeException("committing", e);
+    }
+  }
+
   public void setTupleBlast(int i)
   {
     this.tuple_blast = i;
@@ -275,5 +386,15 @@ Operator.ActivationListener<OperatorContext>
   {
     this.routingKey = routingKey;
   }
+  
+  public IdempotentStorageManager getIdempotentStorageManager() {
+    return idempotentStorageManager;
+  }
+  
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
+  
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index a78febb..cc6b7db 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -16,12 +16,16 @@
 package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.api.Context.OperatorContext;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
+
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,21 +70,70 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
   transient Channel channel = null;
   transient String exchange = "testEx";
   transient String queueName="testQ";
+  
+  private IdempotentStorageManager idempotentStorageManager;  
+  private transient long currentWindowId;
+  private transient long largestRecoveryWindowId;
+  private transient int operatorContextId;
+  protected transient boolean skipProcessingTuple = false;
+  private transient OperatorContext context;
+
 
   @Override
   public void setup(OperatorContext context)
   {
+    // Needed to setup idempotency storage manager in setter 
+    this.context = context;
+    this.operatorContextId = context.getId();
+
     try {
       connFactory.setHost("localhost");
       connection = connFactory.newConnection();
       channel = connection.createChannel();
       channel.exchangeDeclare(exchange, "fanout");
-//      channel.queueDeclare(queueName, false, false, false, null);
+
+      this.idempotentStorageManager.setup(context);
+
     }
     catch (IOException ex) {
       logger.debug(ex.toString());
+      DTThrowable.rethrow(ex);
+    }
+  }
+  
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;    
+    largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow();
+    if (windowId <= largestRecoveryWindowId) {
+      // Do not resend already sent tuples
+      skipProcessingTuple = true;
+    }
+    else
+    {
+      skipProcessingTuple = false;
     }
   }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void endWindow()
+  {
+    if(currentWindowId < largestRecoveryWindowId)
+    {
+      // ignore
+      return;
+    }
+    try {
+      idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
 
   public void setQueueName(String queueName) {
     this.queueName = queueName;
@@ -95,9 +148,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
     try {
       channel.close();
       connection.close();
+      this.idempotentStorageManager.teardown();
     }
     catch (IOException ex) {
       logger.debug(ex.toString());
     }
   }
+  
+  public IdempotentStorageManager getIdempotentStorageManager() {
+    return idempotentStorageManager;
+  }
+  
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {    
+    this.idempotentStorageManager = idempotentStorageManager;    
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
index c16f70f..8e6ff39 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
@@ -60,7 +60,10 @@ public abstract class AbstractSinglePortRabbitMQOutputOperator<T> extends Abstra
     @Override
     public void process(T tuple)
     {
-      processTuple(tuple); // This is an abstract call
+      if(!skipProcessingTuple)
+      {
+        processTuple(tuple); // This is an abstract call
+      }
     }
   };
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 041e362..a14f4e7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -25,17 +25,21 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.contrib.helper.CollectorModule;
 import com.datatorrent.contrib.helper.MessageQueueTestHelper;
-
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Attribute;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -44,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class RabbitMQInputOperatorTest
 {
   private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
-  
+
   public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
   {
     @Override
@@ -75,7 +79,6 @@ public class RabbitMQInputOperatorTest
       connection = connFactory.newConnection();
       channel = connection.createChannel();
       channel.exchangeDeclare(exchange, "fanout");
-//      channel.queueDeclare(queueName, false, false, false, null);
     }
 
     public void setQueueName(String queueName)
@@ -86,9 +89,7 @@ public class RabbitMQInputOperatorTest
     public void process(Object message) throws IOException
     {
       String msg = message.toString();
-//      logger.debug("publish:" + msg);
       channel.basicPublish(exchange, "", null, msg.getBytes());
-//      channel.basicPublish("", queueName, null, msg.getBytes());
     }
 
     public void teardown() throws IOException
@@ -100,12 +101,11 @@ public class RabbitMQInputOperatorTest
     public void generateMessages(int msgCount) throws InterruptedException, IOException
     {
       for (int i = 0; i < msgCount; i++) {
-        
-        ArrayList<HashMap<String, Integer>>  dataMaps = MessageQueueTestHelper.getMessages();
-        for(int j =0; j < dataMaps.size(); j++)
-        {
-          process(dataMaps.get(j));  
-        }        
+
+        ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
+        for (int j = 0; j < dataMaps.size(); j++) {
+          process(dataMaps.get(j));
+        }
       }
     }
 
@@ -124,6 +124,8 @@ public class RabbitMQInputOperatorTest
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
     RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class);
+    consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
     final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>());
 
     consumer.setHost("localhost");
@@ -144,7 +146,7 @@ public class RabbitMQInputOperatorTest
       public void run()
       {
         long startTms = System.currentTimeMillis();
-        long timeout = 10000L;
+        long timeout = 100000L;
         try {
           while (!collector.inputPort.collections.containsKey("collector") && System.currentTimeMillis() - startTms < timeout) {
             Thread.sleep(500);
@@ -153,16 +155,14 @@ public class RabbitMQInputOperatorTest
           startTms = System.currentTimeMillis();
           while (System.currentTimeMillis() - startTms < timeout) {
             List<?> list = collector.inputPort.collections.get("collector");
-            
+
             if (list.size() < testNum * 3) {
               Thread.sleep(10);
-            }
-            else {
+            } else {
               break;
             }
           }
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           logger.error(ex.getMessage(), ex);
           DTThrowable.rethrow(ex);
         } catch (InterruptedException ex) {
@@ -179,5 +179,53 @@ public class RabbitMQInputOperatorTest
     logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
 
     MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
-  }  
+  }
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    RabbitMQInputOperator operator = new RabbitMQInputOperator();
+    operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    operator.setHost("localhost");
+    operator.setExchange("testEx");
+    operator.setExchangeType("fanout");
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+    operator.outputPort.setSink(sink);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+    operator.setup(context);
+    operator.activate(context);
+
+    final RabbitMQMessageGenerator publisher = new RabbitMQMessageGenerator();
+    publisher.setup();
+    publisher.generateMessages(5);
+
+    Thread.sleep(10000);
+
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    operator.deactivate();
+    Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+
+    // failure and then re-deployment of operator
+    sink.collectedTuples.clear();
+    operator.setup(context);
+    operator.activate(context);
+
+    Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    operator.beginWindow(1);
+    operator.endWindow();
+    Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+    sink.collectedTuples.clear();
+
+    operator.deactivate();
+    operator.teardown();
+    operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1);
+    publisher.teardown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index a170a0e..27213c3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.contrib.helper.SourceModule;
-
+import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
@@ -45,7 +45,7 @@ public class RabbitMQOutputOperatorTest
     public int count = 0;
     private final String host = "localhost";
     ConnectionFactory connFactory = new ConnectionFactory();
-//  QueueingConsumer consumer = null;
+    // QueueingConsumer consumer = null;
     Connection connection = null;
     Channel channel = null;
     TracingConsumer tracingConsumer = null;
@@ -64,8 +64,6 @@ public class RabbitMQOutputOperatorTest
       queueName = channel.queueDeclare().getQueue();
       channel.queueBind(queueName, exchange, "");
 
-//      consumer = new QueueingConsumer(channel);
-//      channel.basicConsume(queueName, true, consumer);
       tracingConsumer = new TracingConsumer(channel);
       cTag = channel.basicConsume(queueName, true, tracingConsumer);
     }
@@ -125,7 +123,6 @@ public class RabbitMQOutputOperatorTest
     }
   }
 
-
   @Test
   public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
   {
@@ -133,7 +130,7 @@ public class RabbitMQOutputOperatorTest
     runTest(testNum);
     logger.debug("end of test");
   }
-  
+
   protected void runTest(int testNum) throws IOException
   {
     RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver();
@@ -144,23 +141,22 @@ public class RabbitMQOutputOperatorTest
     SourceModule source = dag.addOperator("source", new SourceModule());
     source.setTestNum(testNum);
     RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator());
-    
+    collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
     collector.setExchange("testEx");
     dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
     final LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
     lc.runAsync();
-    try {      
+    try {
       Thread.sleep(1000);
       long timeout = 10000L;
       long startTms = System.currentTimeMillis();
-      while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
-      {
+      while ((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) {
         Thread.sleep(100);
-      } 
-    }
-    catch (InterruptedException ex) {
+      }
+    } catch (InterruptedException ex) {
       Assert.fail(ex.getMessage());
     } finally {
       lc.shutdown();
@@ -170,11 +166,9 @@ public class RabbitMQOutputOperatorTest
     for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
       if (e.getKey().equals("a")) {
         Assert.assertEquals("emitted value for 'a' was ", new Integer(2), e.getValue());
-      }
-      else if (e.getKey().equals("b")) {
+      } else if (e.getKey().equals("b")) {
         Assert.assertEquals("emitted value for 'b' was ", new Integer(20), e.getValue());
-      }
-      else if (e.getKey().equals("c")) {
+      } else if (e.getKey().equals("c")) {
         Assert.assertEquals("emitted value for 'c' was ", new Integer(1000), e.getValue());
       }
     }


[07/25] incubator-apex-malhar git commit: Merge pull request #1531 from chandnisingh/feature-bugFixes-3.1.0

Posted by da...@apache.org.
Merge pull request #1531 from chandnisingh/feature-bugFixes-3.1.0

omitting type from FieldInfo

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8e94665e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8e94665e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8e94665e

Branch: refs/heads/feature-AppData
Commit: 8e94665e5430225a609b475e465830d8e19ea872
Parents: f40ba34 f6d85fe
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Aug 10 14:13:26 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Aug 10 14:13:26 2015 -0700

----------------------------------------------------------------------
 library/src/main/java/com/datatorrent/lib/util/FieldInfo.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------



[23/25] incubator-apex-malhar git commit: Fixed backwards compatibility errors.

Posted by da...@apache.org.
Fixed backwards compatibility errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/59622483
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/59622483
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/59622483

Branch: refs/heads/feature-AppData
Commit: 59622483ebeccbbbf37fb428020fddf38c14627e
Parents: b24e99f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 28 15:47:52 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700

----------------------------------------------------------------------
 demos/pom.xml                                   |  2 +-
 .../query/serde/DataQuerySnapshotValidator.java |  2 +-
 .../lib/appdata/schemas/DataQuerySnapshot.java  |  7 ++++-
 .../lib/appdata/schemas/SchemaUtils.java        | 27 ++++++++++++++++++++
 .../lib/io/PubSubWebSocketAppDataQuery.java     |  8 +++---
 .../lib/io/PubSubWebSocketAppDataResult.java    |  6 ++---
 .../lib/io/WebSocketInputOperator.java          |  2 +-
 .../lib/io/WebSocketOutputOperator.java         |  2 +-
 pom.xml                                         |  4 +--
 9 files changed, 46 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 7976aa0..2ef2fa9 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -28,7 +28,7 @@
   </modules>
 
   <properties>
-    <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version>
+    <datatorrent.version>3.2.0-SNAPSHOT</datatorrent.version>
     <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
     <semver.plugin.skip>true</semver.plugin.skip>
   </properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
index d2b7ef0..2eb788d 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
@@ -55,7 +55,7 @@ public class DataQuerySnapshotValidator implements CustomMessageValidator
     }
 
     if (gdqt.getFields().getFields().isEmpty()) {
-      gdqt.setFields(new Fields(fields));
+      gdqt.setFieldsVal(new Fields(fields));
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
index 5714597..8e68380 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
@@ -136,12 +136,17 @@ public class DataQuerySnapshot extends Query
    * Sets the fields of the query.
    * @param fields The fields of the query.
    */
-  public final void setFields(Fields fields)
+  private void setFields(Fields fields)
   {
     Preconditions.checkNotNull(fields);
     this.fields = fields;
   }
 
+  public void setFieldsVal(Fields fields)
+  {
+    setFields(fields);
+  }
+
   /**
    * Gets the fields of the query.
    * @return The fields of the query.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index ce68c7f..de3c013 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -21,6 +21,7 @@ import java.io.StringWriter;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -137,6 +138,13 @@ public class SchemaUtils
   public static boolean checkValidKeys(JSONObject jo,
                                        Collection<Fields> fieldsCollection)
   {
+    return checkValidKeysHelper(jo,
+                                fieldsCollection);
+  }
+
+  private static boolean checkValidKeysHelper(JSONObject jo,
+                                              Collection<Fields> fieldsCollection)
+  {
     for (Fields fields: fieldsCollection) {
       LOG.debug("Checking keys: {}", fields);
       if (checkValidKeys(jo, fields)) {
@@ -151,6 +159,12 @@ public class SchemaUtils
     return false;
   }
 
+  public static boolean checkValidKeys(JSONObject jo, List<Fields> fieldsCollection)
+  {
+    return checkValidKeysHelper(jo,
+                                fieldsCollection);
+  }
+
   /**
    * This is a utility method to check that the given JSONObject has the given keys.
    * It throws an {@link IllegalArgumentException} if it doesn't contain all the given keys.
@@ -161,6 +175,13 @@ public class SchemaUtils
   public static boolean checkValidKeysEx(JSONObject jo,
                                          Collection<Fields> fieldsCollection)
   {
+    return checkValidKeysExHelper(jo,
+                                  fieldsCollection);
+  }
+
+  public static boolean checkValidKeysExHelper(JSONObject jo,
+                                               Collection<Fields> fieldsCollection)
+  {
     for (Fields fields: fieldsCollection) {
       if (checkValidKeys(jo, fields)) {
         return true;
@@ -175,6 +196,12 @@ public class SchemaUtils
                                        fieldsCollection);
   }
 
+  public static boolean checkValidKeysEx(JSONObject jo, List<Fields> fieldsCollection)
+  {
+    return checkValidKeysExHelper(jo,
+                                  fieldsCollection);
+  }
+
   public static Set<String> getSetOfJSONKeys(JSONObject jo)
   {
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 031befd..889a4d3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -68,7 +68,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   @Override
   public void setup(OperatorContext context)
   {
-    this.uri = uriHelper(context, uri);
+    setUri(uriHelper(context, getUri()));
     logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
     super.setup(context);
 
@@ -83,7 +83,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   public void beginWindow(long windowId)
   {
     super.beginWindow(windowId);
-    
+
     if (windowBoundedService != null) {
       windowBoundedService.beginWindow(windowId);
     }
@@ -136,7 +136,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   @Override
   public URI getUri()
   {
-    return uri;
+    return super.getUri();
   }
 
   /**
@@ -148,7 +148,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   @Override
   public void setUri(URI uri)
   {
-    this.uri = uri;
+    super.setUri(uri);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index 5f0b947..cdae7b8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -48,7 +48,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
   @Override
   public void setup(OperatorContext context)
   {
-    this.uri = PubSubWebSocketAppDataQuery.uriHelper(context, uri);
+    setUri(PubSubWebSocketAppDataQuery.uriHelper(context, getUri()));
     logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
     super.setup(context);
   }
@@ -67,7 +67,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
   @Override
   public URI getUri()
   {
-    return uri;
+    return super.getUri();
   }
 
   /**
@@ -79,7 +79,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
   @Override
   public void setUri(URI uri)
   {
-    this.uri = uri;
+    super.setUri(uri);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index 69ebfa3..02b9ef2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -53,7 +53,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
    */
   public int readTimeoutMillis = 0;
   //Do not make this @NotNull since null is a valid value for some child classes
-  protected URI uri;
+  private URI uri;
   private transient AsyncHttpClient client;
   private transient final JsonFactory jsonFactory = new JsonFactory();
   protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index f46ccb8..a0cf465 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -49,7 +49,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator
 {
   private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class);
   //Do not make this @NotNull since null is a valid value for some child classes
-  protected URI uri;
+  private URI uri;
   private transient AsyncHttpClient client;
   private transient final JsonFactory jsonFactory = new JsonFactory();
   protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e9dbcd6..1468163 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>dt-framework</artifactId>
-    <version>3.1.0-SNAPSHOT</version>
+    <version>3.2.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar</artifactId>
@@ -38,7 +38,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <netbeans.hint.license>malhar-inc</netbeans.hint.license>
     <maven.deploy.skip>false</maven.deploy.skip>
-    <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version>
+    <dt.framework.version>3.2.0-SNAPSHOT</dt.framework.version>
     <!-- the following properties match the properties defined in core/pom.xml -->
     <jackson.version>1.9.2</jackson.version>
     <jersey.version>1.9</jersey.version>


[10/25] incubator-apex-malhar git commit: Merge branch 'ishark-rabbitMQ' into v3.1.0

Posted by da...@apache.org.
Merge branch 'ishark-rabbitMQ' into v3.1.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0b3bb88d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0b3bb88d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0b3bb88d

Branch: refs/heads/feature-AppData
Commit: 0b3bb88d8ebd2e52d483f77f7951eb56767f32df
Parents: 8e94665 4dc4788
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Wed Aug 12 21:08:44 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Wed Aug 12 21:08:44 2015 -0700

----------------------------------------------------------------------
 .../rabbitmq/AbstractRabbitMQInputOperator.java | 142 +++++++++++++++++--
 .../AbstractRabbitMQOutputOperator.java         |  65 ++++++++-
 ...bstractSinglePortRabbitMQOutputOperator.java |   5 +-
 .../rabbitmq/RabbitMQInputOperatorTest.java     |  86 ++++++++---
 .../rabbitmq/RabbitMQOutputOperatorTest.java    |  28 ++--
 5 files changed, 278 insertions(+), 48 deletions(-)
----------------------------------------------------------------------



[15/25] incubator-apex-malhar git commit: Fix version compatibility issue for Abstract redis store input operator. Reverted back to original super class

Posted by da...@apache.org.
Fix version compatibility issue for Abstract redis store input operator. Reverted back to original super class


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/731b8bbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/731b8bbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/731b8bbe

Branch: refs/heads/feature-AppData
Commit: 731b8bbe3b09ac54a5c5a985e1e76667cc417002
Parents: 0b31fee
Author: ishark <is...@datatorrent.com>
Authored: Tue Aug 18 13:36:21 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Tue Aug 18 13:36:21 2015 -0700

----------------------------------------------------------------------
 .../contrib/redis/AbstractRedisInputOperator.java           | 7 +++++--
 .../contrib/redis/RedisKeyValueInputOperator.java           | 9 +++++++++
 .../contrib/redis/RedisMapAsValueInputOperator.java         | 8 ++++++++
 .../datatorrent/contrib/redis/RedisPOJOInputOperator.java   | 7 +++++++
 4 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 260fbf6..5e62dbb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -19,13 +19,16 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+
 import javax.validation.constraints.NotNull;
+
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
+
 import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 
 /**
@@ -39,7 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
  *          The tuple type.
  * @since 0.9.3
  */
-public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
+public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener
 {
   protected transient List<String> keys = new ArrayList<String>();
   protected transient Integer scanOffset;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
index 8f419bd..0d0efe8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -18,6 +18,8 @@ package com.datatorrent.contrib.redis;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
 import com.datatorrent.lib.util.KeyValPair;
 
 /**
@@ -52,4 +54,11 @@ public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyVa
       keysObjectList.clear();
     }
   }
+
+  @Override
+  public KeyValPair<String, String> convertToTuple(Map<Object, Object> o)
+  {
+    // Do nothing for the override, Scan already done in processTuples
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
index 66ef582..a7f0cd2 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -16,6 +16,7 @@
 package com.datatorrent.contrib.redis;
 
 import java.util.Map;
+
 import com.datatorrent.lib.util.KeyValPair;
 
 /**
@@ -42,4 +43,11 @@ public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<Key
     }
     keys.clear();
   }
+
+  @Override
+  public KeyValPair<String, Map<String, String>> convertToTuple(Map<Object, Object> o)
+  {
+    // Do nothing for the override, Emit already handled in processTuples
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
index 5a73e61..ac3f7fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -201,4 +201,11 @@ public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPai
   {
     this.dataColumns = dataColumns;
   }
+
+  @Override
+  public KeyValPair<String, Object> convertToTuple(Map<Object, Object> o)
+  {
+    // Do nothing for the override, Scan already done in processTuples
+    return null;
+  }
 }


[11/25] incubator-apex-malhar git commit: MLHR-1748 #resolve Created concrete input and output operators for Redis Store Added test cases for the same.

Posted by da...@apache.org.
MLHR-1748 #resolve Created concrete input and output operators for Redis Store
Added test cases for the same.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a57a3d75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a57a3d75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a57a3d75

Branch: refs/heads/feature-AppData
Commit: a57a3d756bafc1269c827a71d4fea549abdf65dd
Parents: f40ba34
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 15:52:25 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 12:07:36 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   8 +-
 .../redis/AbstractRedisInputOperator.java       | 224 +++++++++++++++++-
 .../redis/RedisKeyValueInputOperator.java       |  55 +++++
 .../redis/RedisMapAsValueInputOperator.java     |  45 ++++
 .../contrib/redis/RedisPOJOInputOperator.java   | 204 ++++++++++++++++
 .../contrib/redis/RedisPOJOOutputOperator.java  | 155 +++++++++++++
 .../datatorrent/contrib/redis/RedisStore.java   |  27 +++
 .../contrib/redis/RedisInputOperatorTest.java   | 193 ++++++++++++++++
 .../contrib/redis/RedisPOJOOperatorTest.java    | 230 +++++++++++++++++++
 demos/machinedata/pom.xml                       |   2 +-
 10 files changed, 1138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..50d7234 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -181,6 +181,12 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.8.1.1</version>
@@ -382,7 +388,7 @@
     <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
-      <version>2.2.1</version>
+      <version>2.5.1</version>
       <optional>true</optional>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index ff7a9a5..7f79bd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -15,11 +15,22 @@
  */
 package com.datatorrent.contrib.redis;
 
-import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
 
 /**
  * This is the base implementation of a Redis input operator.
- * <p></p>
+ * 
  * @displayName Abstract Redis Input
  * @category Input
  * @tags redis, key value
@@ -27,6 +38,213 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
  * @param <T> The tuple type.
  * @since 0.9.3
  */
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore>
+public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
 {
+  protected transient List<String> keys = new ArrayList<String>();
+  protected transient Integer scanOffset;
+  protected transient ScanParams scanParameters;
+  private transient boolean scanComplete;
+  private transient Integer backupOffset;
+  private int scanCount;
+  private transient boolean replay;
+
+  @NotNull
+  private IdempotentStorageManager idempotentStorageManager;
+
+  private transient OperatorContext context;
+  private transient long currentWindowId;
+  private transient Integer sleepTimeMillis;
+  private transient Integer scanCallsInCurrentWindow;
+  private RecoveryState recoveryState;
+
+  /*
+   * Recovery State contains last offset processed in window and number of times
+   * ScanKeys was invoked in window We need to capture to capture number of
+   * calls to ScanKeys because, last offset returned by scanKeys call is not
+   * always monotonically increasing. Storing offset and number of times scan
+   * was done for each window, guarantees idempotency for each window
+   */
+  public static class RecoveryState implements Serializable
+  {
+    public Integer scanOffsetAtBeginWindow, numberOfScanCallsInWindow;
+  }
+
+  public AbstractRedisInputOperator()
+  {
+    scanCount = 100;
+    recoveryState = new RecoveryState();
+    recoveryState.scanOffsetAtBeginWindow = 0;
+    recoveryState.numberOfScanCallsInWindow = 0;
+    setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    scanCallsInCurrentWindow = 0;
+    replay = false;
+    if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
+  }
+
+  private void replay(long windowId)
+  {
+    try {
+      if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+        // Begin offset for this window is recovery offset stored for the last
+        // window
+        RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
+        recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
+      }
+
+      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
+      recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
+      if (recoveryState.scanOffsetAtBeginWindow != null) {
+        scanOffset = recoveryState.scanOffsetAtBeginWindow;
+      }
+      replay = true;
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
+  {
+    long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
+    if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
+      return false;
+    }
+    return true ;
+  }
+
+  private void scanKeysFromOffset()
+  {
+    if (!scanComplete) {
+      if (replay && scanCallsInCurrentWindow >= recoveryState.numberOfScanCallsInWindow) {
+        try {
+          Thread.sleep(sleepTimeMillis);
+        } catch (InterruptedException e) {
+          DTThrowable.rethrow(e);
+        }
+        return;
+      }
+
+      ScanResult<String> result = store.ScanKeys(scanOffset, scanParameters);
+      backupOffset = scanOffset;
+      scanOffset = Integer.parseInt(result.getStringCursor());
+      if (scanOffset == 0) {
+        // Redis store returns 0 after all data is read
+        scanComplete = true;
+
+        // point scanOffset to the end in this case for reading any new tuples
+        scanOffset = backupOffset + result.getResult().size();
+      }
+      keys = result.getResult();
+    }
+    scanCallsInCurrentWindow++;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    sleepTimeMillis = context.getValue(context.SPIN_MILLIS);
+    getIdempotentStorageManager().setup(context);
+    this.context = context;
+    scanOffset = 0;
+    scanComplete = false;
+    scanParameters = new ScanParams();
+    scanParameters.count(scanCount);
+    // For the 1st window after checkpoint, windowID - 1 would not have recovery
+    // offset stored in idempotentStorageManager
+    // But recoveryOffset is non-transient, so will be recovered with
+    // checkPointing
+    scanOffset = recoveryState.scanOffsetAtBeginWindow;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    while (replay && scanCallsInCurrentWindow < recoveryState.numberOfScanCallsInWindow) {
+      // If less keys got scanned in this window, scan till recovery offset
+      scanKeysFromOffset();
+      processTuples();
+    }
+    super.endWindow();
+    recoveryState.scanOffsetAtBeginWindow = scanOffset;
+    recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
+
+    if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) {
+      try {
+        getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    getIdempotentStorageManager().teardown();
+  }
+
+  /*
+   * get number of keys to read for each redis key scan
+   */
+  public int getScanCount()
+  {
+    return scanCount;
+  }
+
+  /*
+   * set number of keys to read for each redis key scan
+   */
+  public void setScanCount(int scanCount)
+  {
+    this.scanCount = scanCount;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    scanKeysFromOffset();
+    processTuples();
+  }
+
+  abstract public void processTuples();
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      getIdempotentStorageManager().deleteUpTo(context.getId(), windowId);
+    } catch (IOException e) {
+      throw new RuntimeException("committing", e);
+    }
+  }
+
+  /*
+   * get Idempotent Storage manager instance
+   */
+  public IdempotentStorageManager getIdempotentStorageManager()
+  {
+    return idempotentStorageManager;
+  }
+
+  /*
+   * set Idempotent storage manager instance
+   */
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+  {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
new file mode 100644
index 0000000..8f419bd
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator for fetching
+ * Key-Value pair stored in Redis. It takes in keys to fetch and emits
+ * corresponding <Key, Value> Pair. Value data type is String in this case.
+ * 
+ * @displayName Redis Input Operator for Key Value pair
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, String>>
+{
+  private List<Object> keysObjectList = new ArrayList<Object>();
+
+  @Override
+  public void processTuples()
+  {
+    keysObjectList = new ArrayList<Object>(keys);
+    if (keysObjectList.size() > 0) {
+
+      List<Object> allValues = store.getAll(keysObjectList);
+      for (int i = 0; i < allValues.size() && i < keys.size(); i++) {
+        if (allValues.get(i) == null) {
+          outputPort.emit(new KeyValPair<String, String>(keys.get(i), null));
+        } else {
+          outputPort.emit(new KeyValPair<String, String>(keys.get(i), allValues.get(i).toString()));
+        }
+      }
+      keys.clear();
+      keysObjectList.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
new file mode 100644
index 0000000..66ef582
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.Map;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator It takes in keys to
+ * fetch and emits Values stored as Maps in Redis i.e. when value datatype in
+ * Redis is HashMap 
+ * 
+ * @displayName Redis Input Operator for Map
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+
+public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Map<String, String>>>
+{
+  @Override
+  public void processTuples()
+  {
+    for (String key : keys) {
+      if (store.getType(key).equals("hash")) {
+        Map<String, String> mapValue = store.getMap(key);
+        outputPort.emit(new KeyValPair<String, Map<String, String>>(key, mapValue));
+      }
+    }
+    keys.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
new file mode 100644
index 0000000..5a73e61
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import com.datatorrent.lib.util.PojoUtils.SetterShort;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is a Redis input operator, which scans all keys in Redis store It
+ * converts Value stored as map to Plain Old Java Object. It outputs
+ * KeyValuePair with POJO as value
+ * <p>
+ * This output adapter Reads from RedisStore stored as <Key, Map> It outputs a
+ * Key value pair <key, POJO> as tuples.
+ * </p>
+ *
+ * @displayName Redis POJO Input Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+@Evolving
+public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>>
+{
+  protected final Map<String, Object> map = new HashMap<String, Object>();
+  private ArrayList<FieldInfo> dataColumns;
+  private transient ArrayList<Object> setters;
+  private boolean isFirstTuple = true;
+  private String outputClass;
+  private Class<?> objectClass;
+
+  public RedisPOJOInputOperator()
+  {
+    super();
+    setters = new ArrayList<Object>();
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object convertMapToObject(Map<String, String> tuple)
+  {
+    try {
+      Object mappedObject = objectClass.newInstance();
+      for (int i = 0; i < dataColumns.size(); i++) {
+        final SupportType type = dataColumns.get(i).getType();
+        final String columnName = dataColumns.get(i).getColumnName();
+
+        if (i < setters.size()) {
+          String value = tuple.get(columnName);
+          switch (type) {
+            case STRING:
+              ((Setter<Object, String>) setters.get(i)).set(mappedObject, value);
+              break;
+            case BOOLEAN:
+              ((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value));
+              break;
+            case SHORT:
+              ((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value));
+              break;
+            case INTEGER:
+              ((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value));
+              break;
+            case LONG:
+              ((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value));
+              break;
+            case FLOAT:
+              ((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value));
+              break;
+            case DOUBLE:
+              ((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value));
+              break;
+            default:
+              break;
+          }
+        }
+      }
+      return mappedObject;
+    } catch (Exception e) {
+      DTThrowable.wrapIfChecked(e);
+    }
+    return null;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+  }
+
+  public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException
+  {
+    objectClass = Class.forName(getOutputClass());
+
+    final int size = dataColumns.size();
+    for (int i = 0; i < size; i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+      final Object setter;
+      switch (type) {
+        case STRING:
+          setter = PojoUtils.createSetter(objectClass, getterExpression, String.class);
+          break;
+        case BOOLEAN:
+          setter = PojoUtils.createSetterBoolean(objectClass, getterExpression);
+          break;
+        case SHORT:
+          setter = PojoUtils.createSetterShort(objectClass, getterExpression);
+          break;
+        case INTEGER:
+          setter = PojoUtils.createSetterInt(objectClass, getterExpression);
+          break;
+        case LONG:
+          setter = PojoUtils.createSetterLong(objectClass, getterExpression);
+          break;
+        case FLOAT:
+          setter = PojoUtils.createSetterFloat(objectClass, getterExpression);
+          break;
+        case DOUBLE:
+          setter = PojoUtils.createSetterDouble(objectClass, getterExpression);
+          break;
+        default:
+          setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class);
+          break;
+      }
+      setters.add(setter);
+    }
+  }
+
+  @Override
+  public void processTuples()
+  {
+    for (String key : keys) {
+      if (store.getType(key).equals("hash")) {
+        Map<String, String> mapValue = store.getMap(key);
+        if (isFirstTuple) {
+          try {
+            processFirstTuple(mapValue);
+          } catch (ClassNotFoundException e) {
+            DTThrowable.rethrow(e);
+          }
+        }
+        isFirstTuple = false;
+        outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue)));
+      }
+    }
+    keys.clear();
+  }
+
+  /*
+   * Output class type
+   */
+  public String getOutputClass()
+  {
+    return outputClass;
+  }
+
+  public void setOutputClass(String outputClass)
+  {
+    this.outputClass = outputClass;
+  }
+
+  /*
+   * An arraylist of data column names to be set in Redis store as a Map. Gets
+   * column names, column expressions and column data types
+   */
+  public ArrayList<FieldInfo> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
new file mode 100644
index 0000000..8966248
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * This is a Redis output operator, which takes a Key and corresponding Plain
+ * Old Java Object as input. And writes a Map out to Redis based on Expressions
+ * provided.
+ * <p>
+ * This output adapter takes a Key value pair <key, POJO> as tuples and just
+ * writes to the redis store with the key and the value is a Map containing
+ * object attributes as <keys,value> Note: Redis output operator should never
+ * use the passthrough method because it begins a transaction at beginWindow and
+ * commits a transaction at endWindow, and a transaction in Redis blocks all
+ * other clients.
+ * </p>
+ *
+ * @displayName Redis POJO Output Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+public class RedisPOJOOutputOperator extends AbstractRedisAggregateOutputOperator<KeyValPair<String, Object>>
+{
+  protected final Map<String, Object> map = new HashMap<String, Object>();
+  private ArrayList<FieldInfo> dataColumns;
+  private transient ArrayList<Object> getters;
+  private boolean isFirstTuple = true;
+
+  public RedisPOJOOutputOperator()
+  {
+    super();
+    getters = new ArrayList<Object>();
+  }
+
+  @Override
+  public void storeAggregate()
+  {
+    for (Entry<String, Object> entry : map.entrySet()) {
+
+      Map<String, String> mapObject = convertObjectToMap(entry.getValue());
+      store.put(entry.getKey(), mapObject);
+    }
+  }
+
+  private Map<String, String> convertObjectToMap(Object tuple)
+  {
+
+    Map<String, String> mappedObject = new HashMap<String, String>();
+    for (int i = 0; i < dataColumns.size(); i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String columnName = dataColumns.get(i).getColumnName();
+
+      if (i < getters.size()) {
+        Getter<Object, Object> obj = (Getter<Object, Object>) (getters.get(i));
+
+        Object value = obj.get(tuple);
+        mappedObject.put(columnName, value.toString());
+      }
+    }
+
+    return mappedObject;
+  }
+
+  public void processFirstTuple(KeyValPair<String, Object> tuple)
+  {
+    // Create getters using first value entry in map
+    // Entry<String, Object> entry= tuple.entrySet().iterator().next();
+    Object value = tuple.getValue();
+
+    final Class<?> fqcn = value.getClass();
+    final int size = dataColumns.size();
+    for (int i = 0; i < size; i++) {
+      final SupportType type = dataColumns.get(i).getType();
+      final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+      final Object getter;
+      switch (type) {
+        case STRING:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+          break;
+        case BOOLEAN:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case SHORT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case INTEGER:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, type.getJavaType());
+          break;
+        case LONG:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        default:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+          break;
+      }
+      getters.add(getter);
+    }
+  }
+
+  @Override
+  public void processTuple(KeyValPair<String, Object> tuple)
+  {
+    if (isFirstTuple) {
+      processFirstTuple(tuple);
+    }
+
+    isFirstTuple = false;
+    map.put(tuple.getKey(), tuple.getValue());
+  }
+
+  /*
+   * An arraylist of data column names to be set in Redis store as a Map. Gets
+   * column names, column expressions and column data types
+   */
+  public ArrayList<FieldInfo> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
index ea8e26b..2acc1d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
 import redis.clients.jedis.Transaction;
 
 import com.datatorrent.lib.db.TransactionableKeyValueStore;
@@ -181,6 +183,26 @@ public class RedisStore implements TransactionableKeyValueStore
     return jedis.get(key.toString());
   }
 
+  public String getType(String key)
+  {
+    return jedis.type(key);
+  }
+
+  /**
+   * Gets the stored Map for given the key, when the value data type is a map, stored with hmset  
+   *
+   * @param key
+   * @return hashmap stored for the key.
+   */
+  public Map<String, String> getMap(Object key)
+  {
+    if (isInTransaction()) {
+      throw new RuntimeException("Cannot call get when in redis transaction");
+    }
+    return jedis.hgetAll(key.toString());
+  }
+
+
   /**
    * Gets all the values given the keys.
    * Note that it does NOT work with hash values or list values
@@ -255,6 +277,11 @@ public class RedisStore implements TransactionableKeyValueStore
     }
   }
 
+  public ScanResult<String> ScanKeys(Integer offset, ScanParams params)
+  {
+    return jedis.scan(offset.toString(), params);
+  }
+
   /**
    * Calls hincrbyfloat on the redis store.
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
new file mode 100644
index 0000000..08fb294
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisInputOperatorTest
+{
+  private RedisStore operatorStore;
+  private RedisStore testStore;
+
+  public static class CollectorModule extends BaseOperator
+  {
+    volatile static List<KeyValPair<String, String>> resultMap = new ArrayList<KeyValPair<String, String>>();
+    static long resultCount = 0;
+
+    public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>()
+    {
+      @Override
+      public void process(KeyValPair<String, String> tuple)
+      {
+        resultMap.add(tuple);
+        resultCount++;
+      }
+    };
+  }
+
+  @Test
+  public void testIntputOperator() throws IOException
+  {
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(1);
+
+    testStore.put("test_abc", "789");
+    testStore.put("test_def", "456");
+    testStore.put("test_ghi", "123");
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      DAG dag = lma.getDAG();
+
+      RedisKeyValueInputOperator inputOperator = dag.addOperator("input", new RedisKeyValueInputOperator());
+      final CollectorModule collector = dag.addOperator("collector", new CollectorModule());
+
+      inputOperator.setStore(operatorStore);
+      dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+      final LocalMode.Controller lc = lma.getController();
+
+      new Thread("LocalClusterController")
+      {
+        @Override
+        public void run()
+        {
+          long startTms = System.currentTimeMillis();
+          long timeout = 50000L;
+          try {
+            Thread.sleep(1000);
+            while (System.currentTimeMillis() - startTms < timeout) {
+              if (CollectorModule.resultMap.size() < 3) {
+                Thread.sleep(10);
+              } else {
+                break;
+              }
+            }
+          } catch (InterruptedException ex) {
+          }
+          lc.shutdown();
+        }
+      }.start();
+
+      lc.run();
+
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_abc", "789")));
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_def", "456")));
+      Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_ghi", "123")));
+    } finally {
+      for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+        testStore.remove(entry.getKey());
+      }
+      testStore.disconnect();
+    }
+  }
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(1);
+
+    testStore.put("test_abc", "789");
+    testStore.put("test_def", "456");
+    testStore.put("test_ghi", "123");
+
+    RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
+    operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    
+    operator.setStore(operatorStore);
+    operator.setScanCount(1);
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+    operator.outputPort.setSink(sink);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+    try {
+      operator.setup(context);
+      operator.beginWindow(1);
+      operator.emitTuples();
+      operator.endWindow();
+
+      int numberOfMessagesInWindow1 = sink.collectedTuples.size();
+      sink.collectedTuples.clear();
+
+      operator.beginWindow(2);
+      operator.emitTuples();
+      operator.endWindow();
+      int numberOfMessagesInWindow2 = sink.collectedTuples.size();
+      sink.collectedTuples.clear();
+
+      // failure and then re-deployment of operator
+      // Re-instantiating to reset values
+      operator = new RedisKeyValueInputOperator();
+      operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+      operator.setStore(operatorStore);
+      operator.setScanCount(1);
+      operator.outputPort.setSink(sink);
+      operator.setup(context);
+
+      Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+      operator.beginWindow(1);
+      operator.emitTuples();
+      operator.emitTuples();
+      operator.endWindow();
+
+      Assert.assertEquals("num of messages in window 1", numberOfMessagesInWindow1, sink.collectedTuples.size());
+
+      sink.collectedTuples.clear();
+      operator.beginWindow(2);
+      operator.emitTuples();
+      operator.endWindow();
+      Assert.assertEquals("num of messages in window 2",numberOfMessagesInWindow2, sink.collectedTuples.size());
+    } finally {
+      for (Object e : sink.collectedTuples) {
+        KeyValPair<String, String> entry = (KeyValPair<String, String>) e;
+        testStore.remove(entry.getKey());
+      }
+      sink.collectedTuples.clear();
+      operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5);
+      operator.teardown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
new file mode 100644
index 0000000..7792b5a
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
@@ -0,0 +1,230 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisPOJOOperatorTest
+{
+  private RedisStore operatorStore;
+  private RedisStore testStore;
+
+  public static class TestClass
+  {
+    private Integer intValue;
+    private String stringValue;
+
+    public TestClass()
+    {
+    }
+
+    public TestClass(int v1, String v2)
+    {
+      intValue = v1;
+      stringValue = v2;
+    }
+
+    public Integer getIntValue()
+    {
+      return intValue;
+    }
+
+    public void setIntValue(int intValue)
+    {
+      this.intValue = intValue;
+    }
+
+    public String getStringValue()
+    {
+      return stringValue;
+    }
+
+    public void setStringValue(String stringValue)
+    {
+      this.stringValue = stringValue;
+    }
+  }
+
+  @Test
+  public void testOutputOperator() throws IOException
+  {
+    this.operatorStore = new RedisStore();
+
+    operatorStore.connect();
+    String appId = "test_appid";
+    int operatorId = 0;
+
+    operatorStore.removeCommittedWindowId(appId, operatorId);
+    operatorStore.disconnect();
+
+    RedisPOJOOutputOperator outputOperator = new RedisPOJOOutputOperator();
+
+    ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+    fields.add(new FieldInfo("column1", "intValue", SupportType.INTEGER));
+    fields.add(new FieldInfo("column2", "getStringValue()", SupportType.STRING));
+
+    outputOperator.setDataColumns(fields);
+
+    try {
+      com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_ID, appId);
+
+      outputOperator.setStore(operatorStore);
+      outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes));
+      outputOperator.beginWindow(101);
+
+      KeyValPair<String, Object> keyVal = new KeyValPair<String, Object>("test_abc1", new TestClass(1, "abc"));
+
+      outputOperator.input.process(keyVal);
+
+      outputOperator.endWindow();
+
+      outputOperator.teardown();
+
+      operatorStore.connect();
+
+      Map<String, String> out = operatorStore.getMap("test_abc1");
+      Assert.assertEquals("1", out.get("column1"));
+      Assert.assertEquals("abc", out.get("column2"));
+    } finally {
+      operatorStore.remove("test_abc1");
+      operatorStore.disconnect();
+    }
+  }
+
+  public static class ObjectCollectorModule extends BaseOperator
+  {
+    volatile static Map<String, Object> resultMap = new HashMap<String, Object>();
+    static long resultCount = 0;
+
+    public final transient DefaultInputPort<KeyValPair<String, Object>> inputPort = new DefaultInputPort<KeyValPair<String, Object>>()
+    {
+      @Override
+      public void process(KeyValPair<String, Object> tuple)
+      {
+        resultMap.put(tuple.getKey(), tuple.getValue());
+        resultCount++;
+      }
+    };
+  }
+
+  @Test
+  public void testInputOperator() throws IOException
+  {
+    @SuppressWarnings("unused")
+    Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
+
+    this.operatorStore = new RedisStore();
+    this.testStore = new RedisStore();
+
+    testStore.connect();
+    ScanParams params = new ScanParams();
+    params.count(100);
+
+    Map<String, String> value = new HashMap<String, String>();
+    value.put("Column1", "abc");
+    value.put("Column2", "1");
+
+    Map<String, String> value1 = new HashMap<String, String>();
+    value1.put("Column1", "def");
+    value1.put("Column2", "2");
+
+    Map<String, String> value2 = new HashMap<String, String>();
+    value2.put("Column1", "ghi");
+    value2.put("Column2", "3");
+
+    testStore.put("test_abc_in", value);
+    testStore.put("test_def_in", value1);
+    testStore.put("test_ghi_in", value2);
+
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      DAG dag = lma.getDAG();
+
+      RedisPOJOInputOperator inputOperator = dag.addOperator("input", new RedisPOJOInputOperator());
+      final ObjectCollectorModule collector = dag.addOperator("collector", new ObjectCollectorModule());
+
+      ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+      fields.add(new FieldInfo("Column1", "stringValue", SupportType.STRING));
+      fields.add(new FieldInfo("Column2", "intValue", SupportType.INTEGER));
+
+      inputOperator.setDataColumns(fields);
+      inputOperator.setOutputClass(TestClass.class.getName());
+
+      inputOperator.setStore(operatorStore);
+      dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+      final LocalMode.Controller lc = lma.getController();
+
+      new Thread("LocalClusterController")
+      {
+        @Override
+        public void run()
+        {
+          long startTms = System.currentTimeMillis();
+          long timeout = 10000L;
+          try {
+            Thread.sleep(1000);
+            while (System.currentTimeMillis() - startTms < timeout) {
+              if (ObjectCollectorModule.resultMap.size() < 3) {
+                Thread.sleep(10);
+              } else {
+                break;
+              }
+            }
+          } catch (InterruptedException ex) {
+          }
+          lc.shutdown();
+        }
+      }.start();
+
+      lc.run();
+
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_abc_in"));
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_def_in"));
+      Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_ghi_in"));
+
+      TestClass a = (TestClass) ObjectCollectorModule.resultMap.get("test_abc_in");
+      Assert.assertNotNull(a);
+      Assert.assertEquals("abc", a.stringValue);
+      Assert.assertEquals("1", a.intValue.toString());
+    } finally {
+      for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+        testStore.remove(entry.getKey());
+      }
+      testStore.disconnect();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 1f3f075..3498d0d 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -31,7 +31,7 @@
     <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
-      <version>2.2.1</version>
+      <version>2.5.1</version>
     </dependency>
     <dependency>
       <groupId>javax.mail</groupId>



[25/25] incubator-apex-malhar git commit: Made the twitter demo us an embeddable query operator.

Posted by da...@apache.org.
Made the twitter demo us an embeddable query operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/819e175f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/819e175f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/819e175f

Branch: refs/heads/feature-AppData
Commit: 819e175f54ab9869d932ec04b73cfaff451387ac
Parents: 6cff911
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Aug 11 15:23:45 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700

----------------------------------------------------------------------
 demos/pom.xml                                   |   2 +-
 .../twitter/TwitterTopCounterApplication.java   |   8 +-
 .../src/main/resources/META-INF/properties.xml  |   4 +
 .../com/datatorrent/lib/appdata/StoreUtils.java |  84 +++++++++++
 .../lib/appdata/query/WindowBoundedService.java | 147 +++++++++++++++++++
 .../snapshot/AbstractAppDataSnapshotServer.java |  69 +++++++--
 .../lib/io/PubSubWebSocketAppDataQuery.java     |  82 ++++++++++-
 .../query/QueryManagerAsynchronousTest.java     |   1 -
 .../appdata/query/WindowBoundedServiceTest.java | 124 ++++++++++++++++
 pom.xml                                         |   4 +-
 10 files changed, 507 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 2e93e02..7976aa0 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -28,7 +28,7 @@
   </modules>
 
   <properties>
-    <datatorrent.version>3.0.0</datatorrent.version>
+    <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version>
     <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
     <semver.plugin.skip>true</semver.plugin.skip>
   </properties>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
index db59bdf..508b8a1 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
@@ -195,7 +195,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
     if (!StringUtils.isEmpty(gatewayAddress)) {
       URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
 
-      AppDataSnapshotServerMap snapshotServer = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
+      AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
 
       Map<String, String> conversionMap = Maps.newHashMap();
       conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
@@ -204,15 +204,15 @@ public class TwitterTopCounterApplication implements StreamingApplication
       snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
       snapshotServer.setTableFieldToMapField(conversionMap);
 
-      PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
+      PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
       wsQuery.setUri(uri);
-      Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+      snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
       PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
       wsResult.setUri(uri);
       Operator.InputPort<String> queryResultPort = wsResult.input;
 
       dag.addStream("MapProvider", topCount, snapshotServer.input);
-      dag.addStream("Query", queryPort, snapshotServer.query);
       dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
     }
     else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml b/demos/twitter/src/main/resources/META-INF/properties.xml
index 53b7fb9..e2547cc 100644
--- a/demos/twitter/src/main/resources/META-INF/properties.xml
+++ b/demos/twitter/src/main/resources/META-INF/properties.xml
@@ -90,6 +90,10 @@
     <value>TwitterHashtagQueryDemo</value>
   </property>
   <property>
+    <name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>TwitterHashtagQueryDemo</value>
+  </property>
+  <property>
     <name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name>
     <value>TwitterHashtagQueryResultDemo</value>
   </property>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
new file mode 100644
index 0000000..11efc02
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.io.SimpleSinglePortInputOperator.BufferingOutputPort;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Sink;
+
+public class StoreUtils
+{
+  /**
+   * This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port
+   * of the encapsulating {@link AppData.Store}.
+   * @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the
+   * {@link AppData.Store}'s input port.
+   * @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
+   * @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}.
+   */
+  public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort)
+  {
+    outputPort.setSink(
+      new Sink<Object>()
+      {
+        @Override
+        @SuppressWarnings("unchecked")
+        public void put(Object tuple)
+        {
+          LOG.debug("processing tuple");
+          inputPort.process((T)tuple);
+        }
+
+        @Override
+        public int getCount(boolean reset)
+        {
+          return 0;
+        }
+
+      }
+    );
+  }
+
+  /**
+   * This is a utility class which is responsible for flushing {@link BufferingOutputPort}s.
+   * @param <TUPLE_TYPE> The type of the tuple emitted by the {@link BufferingOutputPort}.
+   */
+  public static class BufferingOutputPortFlusher<TUPLE_TYPE> implements Runnable
+  {
+    private final BufferingOutputPort<TUPLE_TYPE> port;
+
+    public BufferingOutputPortFlusher(BufferingOutputPort<TUPLE_TYPE> port)
+    {
+      this.port = Preconditions.checkNotNull(port);
+    }
+
+    @Override
+    public void run()
+    {
+      port.flush(Integer.MAX_VALUE);
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
new file mode 100644
index 0000000..7b82f4f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This class asynchronously executes a function so that the function is only called between calls
+ * to {@link Operator#beginWindow} and {@link Operator#endWindow}.<br/><br/>
+ * This service works by asynchronously calling its {@link #execute} method only after
+ * {@link #beginWindow} and called and before {@link #endWindow} ends. Calls to {@link #beginWindow}
+ * and {@link endWindow} will happen in the enclosing {@link Operator}'s main thread.
+ * <br/><br/>
+ * <b>Note:</b> This service cannot be used in operators which allow checkpointing within an
+ * application window.
+ */
+public class WindowBoundedService implements Component<OperatorContext>
+{
+  public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10;
+
+  /**
+   * The execute interval period in milliseconds.
+   */
+  private final long executeIntervalMillis;
+  /**
+   * The code to execute asynchronously.
+   */
+  private final Runnable runnable;
+  protected transient ExecutorService executorThread;
+
+  private final transient Semaphore mutex = new Semaphore(0);
+
+  public WindowBoundedService(Runnable runnable)
+  {
+    this.executeIntervalMillis = DEFAULT_FLUSH_INTERVAL_MILLIS;
+    this.runnable = Preconditions.checkNotNull(runnable);
+  }
+
+  public WindowBoundedService(long executeIntervalMillis,
+                              Runnable runnable)
+  {
+    Preconditions.checkArgument(executeIntervalMillis > 0,
+                                "The executeIntervalMillis must be positive");
+    this.executeIntervalMillis = executeIntervalMillis;
+    this.runnable = Preconditions.checkNotNull(runnable);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread"));
+    executorThread.submit(new AsynchExecutorThread(Thread.currentThread()));
+  }
+
+  public void beginWindow(long windowId)
+  {
+    mutex.release();
+  }
+
+  public void endWindow()
+  {
+    try {
+      mutex.acquire();
+    } catch (InterruptedException ex) {
+      DTThrowable.wrapIfChecked(ex);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    executorThread.shutdownNow();
+  }
+
+  public class AsynchExecutorThread implements Callable<Void>
+  {
+    private final Thread mainThread;
+    private long lastExecuteTime = 0;
+
+    public AsynchExecutorThread(Thread mainThread)
+    {
+      this.mainThread = mainThread;
+    }
+
+    @Override
+    @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"})
+    public Void call() throws Exception
+    {
+      try {
+        loop();
+      } catch (Exception e) {
+        LOG.error("Exception thrown while processing:", e);
+        mutex.release();
+        mainThread.interrupt();
+      }
+
+      return null;
+    }
+
+    @SuppressWarnings("SleepWhileInLoop")
+    private void loop() throws Exception
+    {
+      while (true) {
+        long currentTime = System.currentTimeMillis();
+        long diff = currentTime - lastExecuteTime;
+        if (diff > executeIntervalMillis) {
+          lastExecuteTime = currentTime;
+          mutex.acquireUninterruptibly();
+          runnable.run();
+          mutex.release();
+        } else {
+          Thread.sleep(executeIntervalMillis - diff);
+        }
+      }
+    }
+  }
+
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(WindowBoundedService.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index d0241d2..a51908f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.mutable.MutableLong;
 
+import com.datatorrent.lib.appdata.StoreUtils;
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
 import com.datatorrent.lib.appdata.query.QueryExecutor;
@@ -43,8 +45,10 @@ import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 
 import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
 
 /**
  * This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data
@@ -54,7 +58,7 @@ import com.datatorrent.common.experimental.AppData;
  * @param <INPUT_EVENT> The type of the input events that the operator accepts.
  * @since 3.0.0
  */
-public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator
+public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator, AppData.Store<String>
 {
   /**
    * The {@link QueryManagerSynchronous} for the operator.
@@ -84,11 +88,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    * The current data to be served by the operator.
    */
   private List<GPOMutable> currentData = Lists.newArrayList();
+  private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
 
   @AppData.ResultPort
   public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>();
 
   @AppData.QueryPort
+  @InputPortFieldAnnotation(optional=true)
   public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
   {
     @Override
@@ -99,24 +105,22 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
 
       try {
         query = queryDeserializerFactory.deserialize(queryJSON);
-      }
-      catch(IOException ex) {
+      } catch (IOException ex) {
         LOG.error("Error parsing query: {}", queryJSON);
         LOG.error("{}", ex);
         return;
       }
 
-      if(query instanceof SchemaQuery) {
-        SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery) query);
+      if (query instanceof SchemaQuery) {
+        SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
 
-        if(schemaResult != null) {
+        if (schemaResult != null) {
           String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
           LOG.debug("emitting {}", schemaResultJSON);
           queryResult.emit(schemaResultJSON);
         }
-      }
-      else if(query instanceof DataQuerySnapshot) {
-        queryProcessor.enqueue((DataQuerySnapshot) query, null, null);
+      } else if (query instanceof DataQuerySnapshot) {
+        queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
       }
     }
   };
@@ -150,6 +154,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
    */
   public abstract GPOMutable convert(INPUT_EVENT inputEvent);
 
+
+  @Override
+  final public void activate(OperatorContext ctx)
+  {
+    embeddableQueryInfoProvider.activate(ctx);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void setup(OperatorContext context)
@@ -164,17 +175,33 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
     resultSerializerFactory = new MessageSerializerFactory(resultFormatter);
     queryProcessor.setup(context);
+
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.enableEmbeddedMode();
+      LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
+      StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
+                                             query);
+      embeddableQueryInfoProvider.setup(context);
+    }
   }
 
   @Override
   public void beginWindow(long windowId)
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.beginWindow(windowId);
+    }
+
     queryProcessor.beginWindow(windowId);
   }
 
   @Override
   public void endWindow()
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.endWindow();
+    }
+
     {
       Result result = null;
 
@@ -191,9 +218,21 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void teardown()
   {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.teardown();
+    }
+
     queryProcessor.teardown();
   }
 
+  @Override
+  public void deactivate()
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.deactivate();
+    }
+  }
+
   /**
    * Gets the JSON for the schema.
    * @return the JSON for the schema.
@@ -230,6 +269,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     this.resultFormatter = resultFormatter;
   }
 
+  @Override
+  public EmbeddableQueryInfoProvider<String> getEmbeddableQueryInfoProvider()
+  {
+    return embeddableQueryInfoProvider;
+  }
+
+  @Override
+  public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider)
+  {
+    this.embeddableQueryInfoProvider = Preconditions.checkNotNull(embeddableQueryInfoProvider);
+  }
+
   /**
    * The {@link QueryExecutor} which returns the results for queries.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 14a2d2b..031befd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,16 +19,23 @@ package com.datatorrent.lib.io;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import javax.validation.constraints.Min;
+
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.appdata.StoreUtils.BufferingOutputPortFlusher;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
 
 import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
 import com.datatorrent.common.util.PubSubMessage;
 
 /**
@@ -40,11 +47,18 @@ import com.datatorrent.common.util.PubSubMessage;
  * @tags input, app data, query
  * @since 3.0.0
  */
-public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider
+public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider, EmbeddableQueryInfoProvider<String>
 {
   private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class);
 
   private static final long serialVersionUID = 201506121124L;
+  public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10;
+
+  private boolean useEmitThread;
+  @Min(0)
+  private long executeIntervalMillis = DEFAULT_EXECUTE_INTERVAL_MILLIS;
+
+  private transient WindowBoundedService windowBoundedService;
 
   public PubSubWebSocketAppDataQuery()
   {
@@ -57,6 +71,42 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
     this.uri = uriHelper(context, uri);
     logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
     super.setup(context);
+
+    if (useEmitThread) {
+      windowBoundedService = new WindowBoundedService(executeIntervalMillis,
+                                                      new BufferingOutputPortFlusher<>(this.outputPort));
+      windowBoundedService.setup(context);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    
+    if (windowBoundedService != null) {
+      windowBoundedService.beginWindow(windowId);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (windowBoundedService != null) {
+      windowBoundedService.endWindow();
+    }
+
+    super.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    if (windowBoundedService != null) {
+      windowBoundedService.teardown();
+    }
+
+    super.teardown();
   }
 
   public static URI uriHelper(OperatorContext context, URI uri)
@@ -139,4 +189,34 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   {
     return "pubsub";
   }
+
+  @Override
+  public DefaultOutputPort<String> getOutputPort()
+  {
+    return outputPort;
+  }
+
+  @Override
+  public void enableEmbeddedMode()
+  {
+    useEmitThread = true;
+  }
+
+  /**
+   * Get the number of milliseconds between calls to execute.
+   * @return The number of milliseconds between calls to execute.
+   */
+  public long getExecuteIntervalMillis()
+  {
+    return executeIntervalMillis;
+  }
+
+  /**
+   * The number of milliseconds between calls to execute.
+   * @param executeIntervalMillis The number of milliseconds between calls to execute.
+   */
+  public void setExecuteIntervalMillis(long executeIntervalMillis)
+  {
+    this.executeIntervalMillis = executeIntervalMillis;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
index ef5f0f5..277bb3a 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
@@ -54,7 +54,6 @@ public class QueryManagerAsynchronousTest
         Thread.sleep(200);
       }
       catch(InterruptedException ex) {
-        throw new RuntimeException(ex);
       }
       Thread.interrupted();
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
new file mode 100644
index 0000000..3674ce5
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+
+import com.datatorrent.lib.appdata.query.QueryManagerAsynchronousTest.InterruptClear;
+
+public class WindowBoundedServiceTest
+{
+  @Rule
+  public TestWatcher testMeta = new InterruptClear();
+
+  @Test
+  public void simpleLoopTest() throws Exception
+  {
+    CounterRunnable counterRunnable = new CounterRunnable();
+
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        counterRunnable);
+    wbs.setup(null);
+    Thread.sleep(500);
+    Assert.assertEquals(0, counterRunnable.getCounter());
+    wbs.beginWindow(0);
+    Thread.sleep(500);
+    wbs.endWindow();
+    int currentCount = counterRunnable.getCounter();
+    Thread.sleep(500);
+    wbs.teardown();
+    Assert.assertEquals(currentCount, counterRunnable.getCounter());
+  }
+
+  @Test
+  public void runTest() throws Exception
+  {
+    CounterRunnable counterRunnable = new CounterRunnable();
+
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        counterRunnable);
+    wbs.setup(null);
+    wbs.beginWindow(0);
+    Thread.sleep(500);
+    wbs.endWindow();
+    wbs.teardown();
+    Assert.assertTrue(counterRunnable.getCounter() > 0);
+  }
+
+  @Test
+  public void exceptionTest() throws Exception
+  {
+    WindowBoundedService wbs = new WindowBoundedService(1,
+                                                        new ExceptionRunnable());
+
+    wbs.setup(null);
+    wbs.beginWindow(0);
+
+    boolean caughtException = false;
+
+    try {
+      Thread.sleep(500);
+    } catch (InterruptedException e) {
+      caughtException = true;
+    }
+
+    try {
+      wbs.endWindow();
+    } catch(Exception e) {
+      caughtException = true;
+    }
+
+    wbs.teardown();
+    Assert.assertEquals(true, caughtException);
+  }
+
+  public static class CounterRunnable implements Runnable
+  {
+    private int counter = 0;
+
+    public CounterRunnable()
+    {
+    }
+
+    @Override
+    public void run()
+    {
+      counter++;
+    }
+
+    public int getCounter()
+    {
+      return counter;
+    }
+  }
+
+  public static class ExceptionRunnable implements Runnable
+  {
+    public ExceptionRunnable()
+    {
+    }
+
+    @Override
+    public void run()
+    {
+      throw new RuntimeException("Simulate Failure");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c95f524..e9dbcd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
   <parent>
     <groupId>com.datatorrent</groupId>
     <artifactId>dt-framework</artifactId>
-    <version>3.0.0</version>
+    <version>3.1.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>malhar</artifactId>
@@ -38,7 +38,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <netbeans.hint.license>malhar-inc</netbeans.hint.license>
     <maven.deploy.skip>false</maven.deploy.skip>
-    <dt.framework.version>3.0.0</dt.framework.version>
+    <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version>
     <!-- the following properties match the properties defined in core/pom.xml -->
     <jackson.version>1.9.2</jackson.version>
     <jersey.version>1.9</jersey.version>


[21/25] incubator-apex-malhar git commit: Improved robustness of query checking and validation for snapshot schema.

Posted by da...@apache.org.
Improved robustness of query checking and validation for snapshot schema.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6cff9117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6cff9117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6cff9117

Branch: refs/heads/feature-AppData
Commit: 6cff911728b6956e1e4c246aed3fb315707f4032
Parents: 6155673
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Aug 5 14:36:21 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:31 2015 -0700

----------------------------------------------------------------------
 .../serde/DataQuerySnapshotDeserializer.java    |  65 ++++++++--
 .../query/serde/DataQuerySnapshotValidator.java |   5 +
 .../lib/appdata/schemas/DataQuerySnapshot.java  |   6 +-
 .../lib/appdata/schemas/SchemaUtils.java        |  70 ++++++-----
 .../DataQuerySnapshotDeserializerTest.java      | 123 ++++++++++++++++---
 .../resources/snapshotquery_deserialize1.json   |   7 ++
 .../resources/snapshotquery_deserialize2.json   |  12 ++
 .../resources/snapshotquery_deserialize3.json   |   4 +
 .../resources/snapshotquery_deserialize4.json   |  13 ++
 .../resources/snapshotquery_deserialize5.json   |  13 ++
 .../resources/snapshotquery_deserialize6.json   |  14 +++
 .../resources/snapshotquery_deserialize7.json   |  15 +++
 .../snapshotquery_invalidcountdown.json         |  13 ++
 .../resources/snapshotquery_validation1.json    |   3 +
 .../resources/snapshotquery_validation2.json    |   3 +
 .../resources/snapshotquery_validation3.json    |   6 +
 16 files changed, 311 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
index bdc761f..9cc080a 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
@@ -30,8 +30,6 @@ import org.slf4j.LoggerFactory;
 import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
 import com.datatorrent.lib.appdata.schemas.Fields;
 import com.datatorrent.lib.appdata.schemas.Message;
-import com.datatorrent.lib.appdata.schemas.QRBase;
-import com.datatorrent.lib.appdata.schemas.Query;
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 
 /**
@@ -40,6 +38,41 @@ import com.datatorrent.lib.appdata.schemas.SchemaUtils;
  */
 public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
 {
+  public static final Set<Fields> FIRST_LEVEL_FIELD_COMBINATIONS;
+  public static final Set<Fields> DATA_FIELD_COMBINATIONS;
+
+  static {
+    Set<Fields> firstLevelFieldCombinations = Sets.newHashSet();
+    firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+                                                               DataQuerySnapshot.FIELD_TYPE,
+                                                               DataQuerySnapshot.FIELD_COUNTDOWN,
+                                                               DataQuerySnapshot.FIELD_DATA,
+                                                               DataQuerySnapshot.FIELD_INCOMPLETE_RESULTS_OK)));
+    firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+                                                               DataQuerySnapshot.FIELD_TYPE,
+                                                               DataQuerySnapshot.FIELD_DATA,
+                                                               DataQuerySnapshot.FIELD_INCOMPLETE_RESULTS_OK)));
+    firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+                                                               DataQuerySnapshot.FIELD_TYPE,
+                                                               DataQuerySnapshot.FIELD_COUNTDOWN,
+                                                               DataQuerySnapshot.FIELD_DATA)));
+    firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+                                                               DataQuerySnapshot.FIELD_TYPE,
+                                                               DataQuerySnapshot.FIELD_DATA)));
+    firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+                                                               DataQuerySnapshot.FIELD_TYPE)));
+
+    FIRST_LEVEL_FIELD_COMBINATIONS = firstLevelFieldCombinations;
+
+    Set<Fields> dataFieldCombinations = Sets.newHashSet();
+    dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_SCHEMA_KEYS,
+                                                         DataQuerySnapshot.FIELD_FIELDS)));
+    dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_SCHEMA_KEYS)));
+    dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_FIELDS)));
+
+    DATA_FIELD_COMBINATIONS = dataFieldCombinations;
+  }
+
   /**
    * Constructor used to instantiate deserializer in {@link MessageDeserializerFactory}.
    */
@@ -57,7 +90,11 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
                                context);
     }
     catch(Exception ex) {
-      throw new IOException(ex);
+      if (ex instanceof IOException) {
+        throw (IOException) ex;
+      } else {
+        throw new IOException(ex);
+      }
     }
   }
 
@@ -74,9 +111,14 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
   {
     JSONObject jo = new JSONObject(json);
 
+    //Validate fields
+    if (!SchemaUtils.checkValidKeys(jo, FIRST_LEVEL_FIELD_COMBINATIONS)) {
+      throw new IOException("Invalid keys");
+    }
+
     //// Query id stuff
-    String id = jo.getString(QRBase.FIELD_ID);
-    String type = jo.getString(Message.FIELD_TYPE);
+    String id = jo.getString(DataQuerySnapshot.FIELD_ID);
+    String type = jo.getString(DataQuerySnapshot.FIELD_TYPE);
 
     if(!type.equals(DataQuerySnapshot.TYPE)) {
       LOG.error("Found type {} in the query json, but expected type {}.", type, DataQuerySnapshot.TYPE);
@@ -85,10 +127,10 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
 
     /// Countdown
     long countdown = -1L;
-    boolean hasCountdown = jo.has(QRBase.FIELD_COUNTDOWN);
+    boolean hasCountdown = jo.has(DataQuerySnapshot.FIELD_COUNTDOWN);
 
     if(hasCountdown) {
-      countdown = jo.getLong(QRBase.FIELD_COUNTDOWN);
+      countdown = jo.getLong(DataQuerySnapshot.FIELD_COUNTDOWN);
     }
 
     ////Data
@@ -98,8 +140,13 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
     if(jo.has(DataQuerySnapshot.FIELD_DATA)) {
       JSONObject data = jo.getJSONObject(DataQuerySnapshot.FIELD_DATA);
 
-      if(data.has(Query.FIELD_SCHEMA_KEYS)) {
-        schemaKeys = SchemaUtils.extractMap(data.getJSONObject(Query.FIELD_SCHEMA_KEYS));
+      if (!SchemaUtils.checkValidKeys(data, DATA_FIELD_COMBINATIONS)) {
+        LOG.error("Error validating {} field", DataQuerySnapshot.FIELD_DATA);
+        throw new IOException("Invalid keys");
+      }
+
+      if (data.has(DataQuerySnapshot.FIELD_SCHEMA_KEYS)) {
+        schemaKeys = SchemaUtils.extractMap(data.getJSONObject(DataQuerySnapshot.FIELD_SCHEMA_KEYS));
       }
 
       if(data.has(DataQuerySnapshot.FIELD_FIELDS)) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
index d13dcc4..d2b7ef0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
+import com.datatorrent.lib.appdata.schemas.Fields;
 import com.datatorrent.lib.appdata.schemas.Message;
 import com.datatorrent.lib.appdata.schemas.SchemaRegistry;
 import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
@@ -53,6 +54,10 @@ public class DataQuerySnapshotValidator implements CustomMessageValidator
       return false;
     }
 
+    if (gdqt.getFields().getFields().isEmpty()) {
+      gdqt.setFields(new Fields(fields));
+    }
+
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
index 0943172..5714597 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
@@ -51,6 +51,10 @@ public class DataQuerySnapshot extends Query
    * The JSON string for the schemaKeys in the query.
    */
   public static final String SCHEMA_KEYS = "schemaKeys";
+  /**
+   * The JSON string for the incompleteResultOK field in the query.
+   */
+  public static final String FIELD_INCOMPLETE_RESULTS_OK = "incompleteResultOK";
 
   /**
    * The fields requested to be returned in the query.
@@ -132,7 +136,7 @@ public class DataQuerySnapshot extends Query
    * Sets the fields of the query.
    * @param fields The fields of the query.
    */
-  private void setFields(Fields fields)
+  public final void setFields(Fields fields)
   {
     Preconditions.checkNotNull(fields);
     this.fields = fields;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index dbd276c..ce68c7f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -19,8 +19,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
 
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -100,18 +100,10 @@ public class SchemaUtils
                                        Fields fields)
   {
     @SuppressWarnings("unchecked")
-    Iterator<String> keyIterator = jo.keys();
     Set<String> fieldSet = fields.getFields();
+    Set<String> jsonKeys = getSetOfJSONKeys(jo);
 
-    while(keyIterator.hasNext()) {
-      String key = keyIterator.next();
-
-      if(!fieldSet.contains(key)) {
-        return false;
-      }
-    }
-
-    return true;
+    return jsonKeys.containsAll(fieldSet);
   }
 
   /**
@@ -124,34 +116,38 @@ public class SchemaUtils
                                       Fields fields)
   {
     @SuppressWarnings("unchecked")
-    Iterator<String> keyIterator = jo.keys();
     Set<String> fieldSet = fields.getFields();
+    Set<String> jsonKeys = getSetOfJSONKeys(jo);
 
-    while(keyIterator.hasNext()) {
-      String key = keyIterator.next();
+    if (!jsonKeys.containsAll(fieldSet)) {
 
-      if(!fieldSet.contains(key)) {
-        throw new IllegalArgumentException("The key " + key +
-                                           " is not valid.");
-      }
+      throw new IllegalArgumentException("The given set of keys "
+                                         + fieldSet
+                                         + " doesn't equal the set of keys in the json "
+                                         + jsonKeys);
     }
   }
 
   /**
    * This is a utility method to check that the given JSONObject has the given keys.
    * @param jo The {@link JSONObject} to check.
-   * @param fieldsList The keys in the {@link JSONObject} to check.
+   * @param fieldsCollection The keys in the {@link JSONObject} to check.
    * @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
    */
   public static boolean checkValidKeys(JSONObject jo,
-                                       List<Fields> fieldsList)
+                                       Collection<Fields> fieldsCollection)
   {
-    for(Fields fields: fieldsList) {
-      if(checkValidKeys(jo, fields)) {
+    for (Fields fields: fieldsCollection) {
+      LOG.debug("Checking keys: {}", fields);
+      if (checkValidKeys(jo, fields)) {
         return true;
       }
     }
 
+    LOG.error("The first level of keys in the provided JSON {} do not match any of the " +
+              "valid keysets {}",
+              getSetOfJSONKeys(jo),
+              fieldsCollection);
     return false;
   }
 
@@ -159,31 +155,37 @@ public class SchemaUtils
    * This is a utility method to check that the given JSONObject has the given keys.
    * It throws an {@link IllegalArgumentException} if it doesn't contain all the given keys.
    * @param jo The {@link JSONObject} to check.
-   * @param fieldsList The keys in the {@link JSONObject} to check.
+   * @param fieldsCollection The keys in the {@link JSONObject} to check.
    * @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
    */
   public static boolean checkValidKeysEx(JSONObject jo,
-                                         List<Fields> fieldsList)
+                                         Collection<Fields> fieldsCollection)
   {
-    for(Fields fields: fieldsList) {
-      if(checkValidKeys(jo, fields)) {
+    for (Fields fields: fieldsCollection) {
+      if (checkValidKeys(jo, fields)) {
         return true;
       }
     }
 
-    Set<String> keys = Sets.newHashSet();
+    Set<String> keys = getSetOfJSONKeys(jo);
+
+    throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
+                                       keys +
+                                       "\nOne of the following key combinations was expected:\n" +
+                                       fieldsCollection);
+  }
+
+  public static Set<String> getSetOfJSONKeys(JSONObject jo)
+  {
     @SuppressWarnings("unchecked")
     Iterator<String> keyIterator = jo.keys();
+    Set<String> keySet = Sets.newHashSet();
 
-    while(keyIterator.hasNext()) {
-      String key = keyIterator.next();
-      keys.add(key);
+    while (keyIterator.hasNext()) {
+      keySet.add(keyIterator.next());
     }
 
-    throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
-                                       keys +
-                                       "\nOne of the following key combinations was expected:\n" +
-                                       fieldsList);
+    return keySet;
   }
 
   public static Map<String, String> convertFieldToType(Map<String, Type> fieldToType)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
index a79cd63..f267571 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
@@ -15,30 +15,48 @@
  */
 package com.datatorrent.lib.appdata.schemas;
 
+import java.io.IOException;
+
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestWatcher;
 
 import com.datatorrent.lib.appdata.query.serde.DataQuerySnapshotDeserializer;
+import com.datatorrent.lib.appdata.query.serde.MessageDeserializerFactory;
 
 public class DataQuerySnapshotDeserializerTest
 {
+  @Rule
+  public DataQuerySnapshotInfo testMeta = new DataQuerySnapshotInfo();
+
+  public static class DataQuerySnapshotInfo extends TestWatcher
+  {
+    public MessageDeserializerFactory queryDeserializerFactory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String snapshotSchemaJSON = SchemaUtils.jarResourceFileToString("snapshotschema.json");
+      SchemaRegistrySingle schemaRegistry = new SchemaRegistrySingle(new SnapshotSchema(snapshotSchemaJSON));
+      queryDeserializerFactory  = new MessageDeserializerFactory(SchemaQuery.class,
+                                                                 DataQuerySnapshot.class);
+      queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
+    }
+  }
+
   @Test
   public void simpleDeserializerTest() throws Exception
   {
     DataQuerySnapshotDeserializer deserializer = new DataQuerySnapshotDeserializer();
 
-    String queryJSON = "{\n"
-                       + "   \"id\": \"1\",\n"
-                       + "   \"type\": \"dataQuery\",\n"
-                       + "   \"data\": {\n"
-                       + "      \"fields\": [ \"url\", \"count\" ]\n"
-                       + "   }\n"
-                       + "}";
+    String queryJSON = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize1.json");
 
     DataQuerySnapshot gQuery = (DataQuerySnapshot) deserializer.deserialize(queryJSON, DataQuerySnapshot.class, null);
 
@@ -50,7 +68,6 @@ public class DataQuerySnapshotDeserializerTest
     Assert.assertEquals("The fields must equal.", fields, gQuery.getFields());
   }
 
-
   @Test
   public void simpleDeserializerWithSchemaKeysTest() throws Exception
   {
@@ -61,15 +78,7 @@ public class DataQuerySnapshotDeserializerTest
 
     DataQuerySnapshotDeserializer deserializer = new DataQuerySnapshotDeserializer();
 
-    String queryJSON = "{\n"
-                       + "   \"id\": \"1\",\n"
-                       + "   \"type\": \"dataQuery\",\n"
-                       + "   \"data\": {\n"
-                       + "      \"schemaKeys\":"
-                       + "      {\"publisher\":\"google\",\"advertiser\":\"microsoft\",\"location\":\"CA\"},"
-                       + "      \"fields\": [ \"url\", \"count\" ]\n"
-                       + "   }\n"
-                       + "}";
+    String queryJSON = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize2.json");
 
     DataQuerySnapshot gQuery = (DataQuerySnapshot) deserializer.deserialize(queryJSON, DataQuerySnapshot.class, null);
 
@@ -81,4 +90,84 @@ public class DataQuerySnapshotDeserializerTest
     Assert.assertEquals("The fields must equal.", fields, gQuery.getFields());
     Assert.assertEquals(expectedSchemaKeys, gQuery.getSchemaKeys());
   }
+
+  @Test
+  public void noFieldsSpecified() throws Exception
+  {
+    String snapshotQuery = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize3.json");
+    DataQuerySnapshot query = (DataQuerySnapshot) testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+
+    Set<String> expectedFields = Sets.newHashSet("boolField", "intField", "doubleField");
+
+    Assert.assertEquals(expectedFields, query.getFields().getFields());
+  }
+
+  @Test
+  public void validDeserialize1Test() throws Exception
+  {
+    testValid("snapshotquery_deserialize4.json");
+  }
+
+  @Test
+  public void validDeserialize2Test() throws Exception
+  {
+    testValid("snapshotquery_deserialize5.json");
+  }
+
+  @Test
+  public void validDeserialize3Test() throws Exception
+  {
+    testValid("snapshotquery_deserialize6.json");
+  }
+
+  @Test
+  public void validDeserializeExtraFieldTest() throws Exception
+  {
+    testValid("snapshotquery_deserialize7.json");
+  }
+
+  @Test
+  public void invalidTestCountdownValue() throws Exception
+  {
+    testInvalid("snapshotquery_invalidcountdown.json");
+  }
+
+  @Test
+  public void invalidTest1() throws Exception
+  {
+    testInvalid("snapshotquery_validation1.json");
+  }
+
+  @Test
+  public void invalidTest2() throws Exception
+  {
+    testInvalid("snapshotquery_validation2.json");
+  }
+
+  @Test
+  public void invalidTest3() throws Exception
+  {
+    testInvalid("snapshotquery_validation3.json");
+  }
+
+  private void testInvalid(String invalidResourceJSON) throws Exception
+  {
+    boolean caughtException = false;
+
+    try {
+      String snapshotQuery = SchemaUtils.jarResourceFileToString(invalidResourceJSON);
+      testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+    } catch (IOException e) {
+      caughtException = true;
+    }
+
+    Assert.assertTrue(caughtException);
+  }
+
+  private void testValid(String validResourceJSON) throws Exception
+  {
+    String snapshotQuery = SchemaUtils.jarResourceFileToString(validResourceJSON);
+    DataQuerySnapshot query = (DataQuerySnapshot) testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+    Assert.assertNotNull(query);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize1.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize1.json b/library/src/test/resources/snapshotquery_deserialize1.json
new file mode 100644
index 0000000..13c80ac
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize1.json
@@ -0,0 +1,7 @@
+{
+"id": 1,
+"type": "dataQuery",
+"data": {
+    "fields": ["url", "count"]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize2.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize2.json b/library/src/test/resources/snapshotquery_deserialize2.json
new file mode 100644
index 0000000..8ebc168
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize2.json
@@ -0,0 +1,12 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["url", "count"]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize3.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize3.json b/library/src/test/resources/snapshotquery_deserialize3.json
new file mode 100644
index 0000000..2d706af
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize3.json
@@ -0,0 +1,4 @@
+{
+"id": 1,
+"type": "dataQuery"
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize4.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize4.json b/library/src/test/resources/snapshotquery_deserialize4.json
new file mode 100644
index 0000000..3a150cf
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize4.json
@@ -0,0 +1,13 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["boolField", "doubleField"]
+  },
+  "countdown":10
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize5.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize5.json b/library/src/test/resources/snapshotquery_deserialize5.json
new file mode 100644
index 0000000..e4907ad
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize5.json
@@ -0,0 +1,13 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["boolField", "doubleField"]
+  },
+  "incompleteResultOK":true
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize6.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize6.json b/library/src/test/resources/snapshotquery_deserialize6.json
new file mode 100644
index 0000000..abeddf4
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize6.json
@@ -0,0 +1,14 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["boolField", "doubleField"]
+  },
+  "incompleteResultOK":true,
+  "countdown":10
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize7.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize7.json b/library/src/test/resources/snapshotquery_deserialize7.json
new file mode 100644
index 0000000..3347b3c
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize7.json
@@ -0,0 +1,15 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["boolField", "doubleField"]
+  },
+  "incompleteResultOK":true,
+  "countdown":10,
+  "blahblahblahinvalid":"a"
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_invalidcountdown.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_invalidcountdown.json b/library/src/test/resources/snapshotquery_invalidcountdown.json
new file mode 100644
index 0000000..8551c83
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_invalidcountdown.json
@@ -0,0 +1,13 @@
+{
+  "id":1,
+  "type": "dataQuery",
+  "countdown":-10,
+  "data": {
+    "schemaKeys": {
+      "publisher":"google",
+      "advertiser":"microsoft",
+      "location":"CA"
+    },
+    "fields": ["url", "count"]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation1.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation1.json b/library/src/test/resources/snapshotquery_validation1.json
new file mode 100644
index 0000000..2572ae5
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation1.json
@@ -0,0 +1,3 @@
+{
+  "id": 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation2.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation2.json b/library/src/test/resources/snapshotquery_validation2.json
new file mode 100644
index 0000000..40fe011
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation2.json
@@ -0,0 +1,3 @@
+{
+  "type": "dataQuery"
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation3.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation3.json b/library/src/test/resources/snapshotquery_validation3.json
new file mode 100644
index 0000000..e5aaad0
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation3.json
@@ -0,0 +1,6 @@
+{
+  "id": 1,
+  "type": "dataQuery",
+  "data": {
+  }
+}