You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/08/29 03:10:51 UTC
[01/25] incubator-apex-malhar git commit: MLHR-1796 - #resolve Fixed
compilation errors in benchmark application.
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/feature-AppData 7e89a86f4 -> 59622483e (forced update)
MLHR-1796 - #resolve Fixed compilation errors in benchmark application.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a980e061
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a980e061
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a980e061
Branch: refs/heads/feature-AppData
Commit: a980e061dffb14b02d4426e59f5e3cdeaa0e2b24
Parents: 0be7372
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Aug 4 20:54:24 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Aug 4 20:54:24 2015 -0700
----------------------------------------------------------------------
.../algo/UniqueValueCountBenchmarkApplication.java | 9 +++++----
.../benchmark/script/RubyOperatorBenchmarkApplication.java | 2 +-
.../benchmark/memsql/MemsqlInputBenchmarkTest.java | 2 +-
.../datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java | 6 +++---
4 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
index 1109a85..2f25130 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java
@@ -19,10 +19,11 @@ package com.datatorrent.benchmark.algo;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.algo.UniqueCounterValue;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+import com.datatorrent.lib.stream.Counter;
import com.datatorrent.lib.testbench.RandomEventGenerator;
import com.datatorrent.api.Context;
@@ -65,12 +66,12 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
uniqCount.setCumulative(false);
- UniqueCounterValue counter = dag.addOperator("count", new UniqueCounterValue());
+ Counter counter = dag.addOperator("count", new Counter());
ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
dag.addStream("datain", randGen.integer_data, uniqCount.data);
dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
- dag.addStream("consoutput", converter.output, counter.data);
- dag.addStream("final", counter.count, output.input);
+ dag.addStream("consoutput", converter.output, counter.input);
+ dag.addStream("final", counter.output, output.input);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
index db998b2..c7e09b4 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java
@@ -22,7 +22,7 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.benchmark.RandomMapOutput;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.script.RubyOperator;
+import com.datatorrent.contrib.ruby.RubyOperator;
import com.datatorrent.lib.testbench.RandomEventGenerator;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
index 580e3da..abc86a1 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
@@ -53,7 +53,7 @@ public class MemsqlInputBenchmarkTest
AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
- MemsqlOutputOperator outputOperator = new MemsqlOutputOperator();
+ MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator();
outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
outputOperator.setBatchSize(BATCH_SIZE);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a980e061/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
index 9195a45..04c0cb0 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
@@ -19,7 +19,7 @@ import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.memsql.MemsqlOutputOperator;
+import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
import com.datatorrent.lib.testbench.RandomEventGenerator;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
@@ -86,8 +86,8 @@ public class MemsqlOutputBenchmark implements StreamingApplication
randomEventGenerator.setTuplesBlast(TUPLE_BLAST);
LOG.debug("Before making output operator");
- MemsqlOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
- new MemsqlOutputOperator());
+ MemsqlPOJOOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator",
+ new MemsqlPOJOOutputOperator());
LOG.debug("After making output operator");
memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE);
[13/25] incubator-apex-malhar git commit: Merge pull request #1534
from ishark/redisStore
Posted by da...@apache.org.
Merge pull request #1534 from ishark/redisStore
MLHR-1748 #resolve #comment Created concrete input and output operators for Redis Store
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/717168bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/717168bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/717168bc
Branch: refs/heads/feature-AppData
Commit: 717168bc8768d4376b5f724fb7a31c975878aea5
Parents: 0b3bb88 ada42ab
Author: Chandni Singh <si...@gmail.com>
Authored: Fri Aug 14 15:17:27 2015 -0700
Committer: Chandni Singh <si...@gmail.com>
Committed: Fri Aug 14 15:17:27 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 8 +-
.../redis/AbstractRedisInputOperator.java | 225 +++++++++++++++++-
.../redis/RedisKeyValueInputOperator.java | 55 +++++
.../redis/RedisMapAsValueInputOperator.java | 45 ++++
.../contrib/redis/RedisPOJOInputOperator.java | 204 ++++++++++++++++
.../contrib/redis/RedisPOJOOutputOperator.java | 155 +++++++++++++
.../datatorrent/contrib/redis/RedisStore.java | 27 +++
.../contrib/redis/RedisInputOperatorTest.java | 193 ++++++++++++++++
.../contrib/redis/RedisPOJOOperatorTest.java | 230 +++++++++++++++++++
demos/machinedata/pom.xml | 2 +-
10 files changed, 1138 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
[17/25] incubator-apex-malhar git commit: update japicmp plugin
version
Posted by da...@apache.org.
update japicmp plugin version
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c4a6d8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c4a6d8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c4a6d8d7
Branch: refs/heads/feature-AppData
Commit: c4a6d8d75538bf623eaf803a42c27044ebbfd6da
Parents: e0ee8ab
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 22:45:39 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 22:45:39 2015 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c4a6d8d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba86445..ccdc00a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
<plugin>
<groupId>com.github.siom79.japicmp</groupId>
<artifactId>japicmp-maven-plugin</artifactId>
- <version>0.5.1</version>
+ <version>0.5.3</version>
<configuration>
<oldVersion>
<dependency>
[03/25] incubator-apex-malhar git commit: Removed references to name
property in BaseOperator
Posted by da...@apache.org.
Removed references to name property in BaseOperator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a0280691
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a0280691
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a0280691
Branch: refs/heads/feature-AppData
Commit: a0280691b82f54dd6e33f7cc01772a94373aa1da
Parents: 0a4250e
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 14:17:02 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Aug 6 15:20:44 2015 -0700
----------------------------------------------------------------------
.../test/java/com/datatorrent/demos/mobile/ApplicationTest.java | 2 --
.../com/datatorrent/lib/io/SimpleSinglePortInputOperator.java | 3 ++-
.../java/com/datatorrent/lib/io/WebSocketInputOperator.java | 3 ++-
.../java/com/datatorrent/lib/io/WebSocketOutputOperator.java | 5 ++---
.../com/datatorrent/lib/multiwindow/SortedMovingWindow.java | 3 ++-
.../com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java | 1 -
.../java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java | 1 -
.../com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java | 2 --
8 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
index d58e8ff..3494417 100644
--- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
+++ b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java
@@ -67,12 +67,10 @@ public class ApplicationTest
URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
- outputOperator.setName("testOutputOperator");
outputOperator.setUri(uri);
outputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.QueryLocation.topic"));
PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>();
- inputOperator.setName("testInputOperator");
inputOperator.setUri(uri);
inputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.LocationResults.topic"));
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
index 07bcaf5..1fbd45f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/SimpleSinglePortInputOperator.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.common.util.BaseOperator;
+import org.apache.commons.lang3.ClassUtils;
/**
* This an input operator which passes data from an asynchronous data source to a port processing thread.
@@ -60,7 +61,7 @@ public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator impl
{
isActive = true;
if (this instanceof Runnable) {
- ioThread = new Thread((Runnable)this, "io-" + this.getName());
+ ioThread = new Thread((Runnable)this, "io-" + ClassUtils.getShortClassName(this.getClass()));
ioThread.start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index a8cfa6e..69ebfa3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.ClassUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -178,7 +179,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
public Thread newThread(Runnable r)
{
Thread t = new Thread(r);
- t.setName(WebSocketInputOperator.this.getName() + "-AsyncHttpClient-" + count++);
+ t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + count++);
return t;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index a7ab3bd..f46ccb8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -19,8 +19,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.concurrent.*;
-import javax.validation.constraints.NotNull;
-
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfigBean;
import com.ning.http.client.websocket.WebSocket;
@@ -32,6 +30,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang3.ClassUtils;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
@@ -193,7 +192,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator
public Thread newThread(Runnable r)
{
Thread t = new Thread(r);
- t.setName(WebSocketOutputOperator.this.getName() + "-AsyncHttpClient-" + count++);
+ t.setName(ClassUtils.getShortClassName(this.getClass()) + "-AsyncHttpClient-" + count++);
return t;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
index 84388e4..df4d482 100644
--- a/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
+++ b/library/src/main/java/com/datatorrent/lib/multiwindow/SortedMovingWindow.java
@@ -28,6 +28,7 @@ import javax.validation.constraints.NotNull;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.google.common.base.Function;
+import org.apache.commons.lang.ClassUtils;
/**
*
@@ -114,7 +115,7 @@ public class SortedMovingWindow<T, K> extends AbstractSlidingWindow<T, List<T>>
k = ((Comparable<T>) expiredTuple).compareTo(minElemInSortedList);
} else {
errorOutput.emit(expiredTuple);
- throw new IllegalArgumentException("Operator \"" + getName() + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
+ throw new IllegalArgumentException("Operator \"" + ClassUtils.getShortClassName(this.getClass()) + "\" encounters an invalid tuple " + expiredTuple + "\nNeither the tuple is comparable Nor Comparator is specified!");
}
} else {
k = comparator.compare(expiredTuple, minElemInSortedList);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
index 3adf90b..a3f5dc0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
@@ -92,7 +92,6 @@ public class HttpJsonChunksInputOperatorTest
CollectorTestSink sink = new CollectorTestSink();
operator.outputPort.setSink(sink);
- operator.setName("testHttpInputNode");
operator.setUrl(new URI(url));
operator.setup(null);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
index 538b6b4..d8b3778 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
@@ -80,7 +80,6 @@ public class HttpLinesInputOperatorTest
final HttpLinesInputOperator operator = new HttpLinesInputOperator();
CollectorTestSink<String> sink = TestUtils.setSink(operator.outputPort, new CollectorTestSink<String>());
- operator.setName("testHttpInputNode");
operator.setUrl(new URI(url));
operator.setup(null);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a0280691/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index 4bfcf45..778524b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -51,12 +51,10 @@ public class PubSubWebSocketOperatorTest
URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
- outputOperator.setName("testOutputOperator");
outputOperator.setUri(uri);
outputOperator.setTopic("testTopic");
PubSubWebSocketInputOperator<Object> inputOperator = new PubSubWebSocketInputOperator<Object>();
- inputOperator.setName("testInputOperator");
inputOperator.setUri(uri);
inputOperator.setTopic("testTopic");
[09/25] incubator-apex-malhar git commit: Added default NOOP instance
for idempotency manager
Posted by da...@apache.org.
Added default NOOP instance for idempotency manager
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4dc4788f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4dc4788f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4dc4788f
Branch: refs/heads/feature-AppData
Commit: 4dc4788f74178509eb01cc1d4402522601095dcd
Parents: 13a3fbe
Author: ishark <is...@datatorrent.com>
Authored: Tue Aug 11 15:10:01 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Tue Aug 11 15:10:01 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 6 ------
.../contrib/rabbitmq/AbstractRabbitMQInputOperator.java | 1 +
2 files changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4dc4788f/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 76e8144..9776e2f 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,11 +565,5 @@
<version>${dt.framework.version}</version>
<type>jar</type>
</dependency>
- <dependency>
- <groupId>com.datatorrent</groupId>
- <artifactId>dt-engine</artifactId>
- <version>${dt.framework.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4dc4788f/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index e408f5e..955a2c8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -111,6 +111,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements
currentWindowRecoveryState = new HashMap<Long, byte[]>();
pendingAck = new HashSet<Long>();
recoveredTags = new HashSet<Long>();
+ idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager();
}
[20/25] incubator-apex-malhar git commit: Fix malhar-contrib
dependency version
Posted by da...@apache.org.
Fix malhar-contrib dependency version
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6155673e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6155673e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6155673e
Branch: refs/heads/feature-AppData
Commit: 6155673e5518d24f0459a29b00b6eb05f04556bc
Parents: e7a475c
Author: thomas <th...@datatorrent.com>
Authored: Sat Aug 22 00:01:46 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Sat Aug 22 00:01:46 2015 -0700
----------------------------------------------------------------------
apps/logstream/pom.xml | 2 +-
apps/pom.xml | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6155673e/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 9643daf..3359efc 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -53,7 +53,7 @@
<dependency>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-contrib</artifactId>
- <version>${datatorrent.version}</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6155673e/apps/pom.xml
----------------------------------------------------------------------
diff --git a/apps/pom.xml b/apps/pom.xml
index b984069..34b2376 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -18,7 +18,6 @@
<properties>
<!-- change this if you desire to use a different version of DataTorrent -->
- <datatorrent.version>3.0.0</datatorrent.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
<maven.deploy.skip>true</maven.deploy.skip>
<maven.install.skip>true</maven.install.skip>
[02/25] incubator-apex-malhar git commit: Fix project version.
Posted by da...@apache.org.
Fix project version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a4250e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a4250e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a4250e0
Branch: refs/heads/feature-AppData
Commit: 0a4250e0bda347562784d54f7c1b6add0685ac37
Parents: a980e06
Author: thomas <th...@datatorrent.com>
Authored: Tue Aug 4 21:26:40 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Tue Aug 4 21:26:40 2015 -0700
----------------------------------------------------------------------
benchmark/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a4250e0/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index fa6a975..0d07203 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -4,7 +4,7 @@
<parent>
<artifactId>malhar</artifactId>
<groupId>com.datatorrent</groupId>
- <version>3.0.0</version>
+ <version>3.1.0-SNAPSHOT</version>
</parent>
<groupId>com.datatorrent</groupId>
[06/25] incubator-apex-malhar git commit: enable server plugin
Posted by da...@apache.org.
enable server plugin
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2e5813d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2e5813d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2e5813d4
Branch: refs/heads/feature-AppData
Commit: 2e5813d4a395f8f3d85ffd7f518bdf7672c3b9cc
Parents: 93ce29c
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Aug 10 11:53:27 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Aug 10 11:56:29 2015 -0700
----------------------------------------------------------------------
apps/logstream/pom.xml | 1 +
demos/pom.xml | 1 +
pom.xml | 52 +++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 7321baa..ce0f4ce 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -15,6 +15,7 @@
<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<skipTests>false</skipTests>
+ <semver.plugin.skip>true</semver.plugin.skip>
</properties>
<name>Logstream Application</name>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 64c6c4a..a5601a5 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -30,6 +30,7 @@
<properties>
<datatorrent.version>3.0.0</datatorrent.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
+ <semver.plugin.skip>true</semver.plugin.skip>
</properties>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2e5813d4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 81a699b..ba86445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,8 +43,60 @@
<jackson.version>1.9.2</jackson.version>
<jersey.version>1.9</jersey.version>
<jetty.version>8.1.10.v20130312</jetty.version>
+ <semver.plugin.skip>false</semver.plugin.skip>
</properties>
+ <profiles>
+ <profile>
+ <activation>
+ <file>
+ <exists>src</exists>
+ </file>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.siom79.japicmp</groupId>
+ <artifactId>japicmp-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <configuration>
+ <oldVersion>
+ <dependency>
+ <groupId>com.datatorrent</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+ </oldVersion>
+ <newVersion>
+ <file>
+ <path>${project.build.directory}/${project.artifactId}-${project.version}.jar</path>
+ </file>
+ </newVersion>
+ <parameter>
+ <onlyModified>true</onlyModified>
+ <accessModifier>protected</accessModifier>
+ <breakBuildOnModifications>false</breakBuildOnModifications>
+ <breakBuildOnBinaryIncompatibleModifications>true</breakBuildOnBinaryIncompatibleModifications>
+ <onlyBinaryIncompatible>false</onlyBinaryIncompatible>
+ <includeSynthetic>false</includeSynthetic>
+ <ignoreMissingClasses>true</ignoreMissingClasses>
+ </parameter>
+ <skip>${semver.plugin.skip}</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>cmp</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<modules>
<module>library</module>
<module>contrib</module>
[19/25] incubator-apex-malhar git commit: Created new release branch.
Updating version to 3.2.0-SNAPSHOT
Posted by da...@apache.org.
Created new release branch. Updating version to 3.2.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e7a475c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e7a475c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e7a475c1
Branch: refs/heads/feature-AppData
Commit: e7a475c11f3b2809de1be859c142738d67dda39f
Parents: 3e07843
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 23:55:05 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 23:55:05 2015 -0700
----------------------------------------------------------------------
apps/logstream/pom.xml | 2 +-
apps/pom.xml | 2 +-
benchmark/pom.xml | 2 +-
contrib/pom.xml | 2 +-
demos/echoserver/pom.xml | 2 +-
demos/frauddetect/pom.xml | 4 ++--
demos/machinedata/pom.xml | 4 ++--
demos/mobile/pom.xml | 4 ++--
demos/mrmonitor/pom.xml | 4 ++--
demos/mroperator/pom.xml | 4 ++--
demos/pi/pom.xml | 4 ++--
demos/pom.xml | 2 +-
demos/r/pom.xml | 4 ++--
demos/twitter/pom.xml | 4 ++--
demos/uniquecount/pom.xml | 4 ++--
demos/wordcount/pom.xml | 4 ++--
demos/yahoofinance/pom.xml | 4 ++--
library/pom.xml | 2 +-
pom.xml | 2 +-
samples/pom.xml | 2 +-
20 files changed, 31 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index 67df24c..9643daf 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -4,7 +4,7 @@
<parent>
<artifactId>malhar-apps</artifactId>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<groupId>com.datatorrent</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/apps/pom.xml
----------------------------------------------------------------------
diff --git a/apps/pom.xml b/apps/pom.xml
index e816985..b984069 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar-apps</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 0d07203..a2a7de2 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -4,7 +4,7 @@
<parent>
<artifactId>malhar</artifactId>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<groupId>com.datatorrent</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 50d7234..ee3c8bc 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar-contrib</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/echoserver/pom.xml
----------------------------------------------------------------------
diff --git a/demos/echoserver/pom.xml b/demos/echoserver/pom.xml
index 3d29f7f..ab64811 100644
--- a/demos/echoserver/pom.xml
+++ b/demos/echoserver/pom.xml
@@ -4,7 +4,7 @@
<parent>
<artifactId>malhar-demos</artifactId>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<groupId>com.datatorrent</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/frauddetect/pom.xml
----------------------------------------------------------------------
diff --git a/demos/frauddetect/pom.xml b/demos/frauddetect/pom.xml
index 13c9fcd..1313311 100644
--- a/demos/frauddetect/pom.xml
+++ b/demos/frauddetect/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>frauddetect-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 3498d0d..e36d83f 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>machinedata-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mobile/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mobile/pom.xml b/demos/mobile/pom.xml
index cd55a7b..6dfc7f6 100644
--- a/demos/mobile/pom.xml
+++ b/demos/mobile/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>mobile-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mrmonitor/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/pom.xml b/demos/mrmonitor/pom.xml
index 181343a..7e04f60 100644
--- a/demos/mrmonitor/pom.xml
+++ b/demos/mrmonitor/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>mrmonitor</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/mroperator/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mroperator/pom.xml b/demos/mroperator/pom.xml
index 4e09392..abf9abe 100644
--- a/demos/mroperator/pom.xml
+++ b/demos/mroperator/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>mroperator</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/pi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pi/pom.xml b/demos/pi/pom.xml
index 5719b2c..33ee252 100644
--- a/demos/pi/pom.xml
+++ b/demos/pi/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>pi-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 7b3aa08..2e93e02 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar-demos</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/r/pom.xml
----------------------------------------------------------------------
diff --git a/demos/r/pom.xml b/demos/r/pom.xml
index 42c203c..3b036ee 100644
--- a/demos/r/pom.xml
+++ b/demos/r/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>r-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml
index 8a0bec9..09ce1e2 100644
--- a/demos/twitter/pom.xml
+++ b/demos/twitter/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>twitter-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/uniquecount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/uniquecount/pom.xml b/demos/uniquecount/pom.xml
index 8b7d3fc..a75c56e 100644
--- a/demos/uniquecount/pom.xml
+++ b/demos/uniquecount/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>uniquecount</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml
index 779b4f7..a3834ba 100644
--- a/demos/wordcount/pom.xml
+++ b/demos/wordcount/pom.xml
@@ -4,7 +4,7 @@
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>wordcount-demo</artifactId>
<packaging>jar</packaging>
@@ -14,7 +14,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/demos/yahoofinance/pom.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml
index 3b4db0c..80bbb4a 100644
--- a/demos/yahoofinance/pom.xml
+++ b/demos/yahoofinance/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<artifactId>yahoo-finance-demo</artifactId>
<packaging>jar</packaging>
@@ -13,7 +13,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-demos</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 77b8506..defd214 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar-library</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccdc00a..c95f524 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,7 @@
</parent>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Malhar Open Source</name>
<url>https://www.datatorrent.com/</url>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/e7a475c1/samples/pom.xml
----------------------------------------------------------------------
diff --git a/samples/pom.xml b/samples/pom.xml
index 7e8b274..f1c13ba 100644
--- a/samples/pom.xml
+++ b/samples/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>malhar</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar-samples</artifactId>
[05/25] incubator-apex-malhar git commit: omitting type from FieldInfo
Posted by da...@apache.org.
omitting type from FieldInfo
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f6d85fea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f6d85fea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f6d85fea
Branch: refs/heads/feature-AppData
Commit: f6d85feaf573f682c7f1670904c9d7c073b50b37
Parents: f40ba34
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Fri Aug 7 14:54:25 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Fri Aug 7 17:09:20 2015 -0700
----------------------------------------------------------------------
library/src/main/java/com/datatorrent/lib/util/FieldInfo.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f6d85fea/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
index a4e4923..5b41ace 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java
@@ -100,6 +100,7 @@ public class FieldInfo
/**
* the Java type of the column
+ * @omitFromUI
*/
public void setType(SupportType type)
{
[04/25] incubator-apex-malhar git commit: Fixed Twitter demo for App
Builder
Posted by da...@apache.org.
Fixed Twitter demo for App Builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f40ba346
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f40ba346
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f40ba346
Branch: refs/heads/feature-AppData
Commit: f40ba346ae660e1b89de1ebbb3f8e1f7ba31cad0
Parents: a028069
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Aug 6 16:07:03 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 6 22:14:05 2015 -0700
----------------------------------------------------------------------
.../contrib/twitter/TwitterSampleInput.java | 32 ++++++++++++++++++++
demos/pom.xml | 4 +--
demos/twitter/pom.xml | 2 +-
.../demos/twitter/WindowedTopCounter.java | 5 +++
4 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
index 916bea8..9daef7f 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java
@@ -293,6 +293,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
}
/**
+ * @return the consumerKey
+ */
+ public String getConsumerKey()
+ {
+ return consumerKey;
+ }
+
+ /**
* @param consumerKey the consumerKey to set
*/
public void setConsumerKey(String consumerKey)
@@ -301,6 +309,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
}
/**
+ * @return the consumerSecret
+ */
+ public String getConsumerSecret()
+ {
+ return consumerSecret;
+ }
+
+ /**
* @param consumerSecret the consumerSecret to set
*/
public void setConsumerSecret(String consumerSecret)
@@ -309,6 +325,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
}
/**
+ * @return the accessToken
+ */
+ public String getAccessToken()
+ {
+ return accessToken;
+ }
+
+ /**
* @param accessToken the accessToken to set
*/
public void setAccessToken(String accessToken)
@@ -317,6 +341,14 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope
}
/**
+ * @return the accessTokenSecret
+ */
+ public String getAccessTokenSecret()
+ {
+ return accessTokenSecret;
+ }
+
+ /**
* @param accessTokenSecret the accessTokenSecret to set
*/
public void setAccessTokenSecret(String accessTokenSecret)
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 64c6c4a..c17120f 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -173,7 +173,7 @@
<dependency>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-library</artifactId>
- <version>${datatorrent.version}</version>
+ <version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
@@ -184,7 +184,7 @@
<dependency>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-library</artifactId>
- <version>${datatorrent.version}</version>
+ <version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml
index c55ee79..8a0bec9 100644
--- a/demos/twitter/pom.xml
+++ b/demos/twitter/pom.xml
@@ -54,7 +54,7 @@
<dependency>
<groupId>com.datatorrent</groupId>
<artifactId>malhar-contrib</artifactId>
- <version>${datatorrent.version}</version>
+ <version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f40ba346/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
index 2edf7fd..3354ed4 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java
@@ -179,6 +179,11 @@ public class WindowedTopCounter<T> extends BaseOperator
topCount = count;
}
+ public int getTopCount()
+ {
+ return topCount;
+ }
+
/**
* @return the windows
*/
[24/25] incubator-apex-malhar git commit: Fix NPE if an embeddable
query info provider is not set on the snapshot server.
Posted by da...@apache.org.
Fix NPE if an embeddable query info provider is not set on the snapshot server.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b24e99f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b24e99f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b24e99f7
Branch: refs/heads/feature-AppData
Commit: b24e99f74bb09350eeac22b5b7de482f86a6ccd7
Parents: d19f746
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Aug 26 18:00:23 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700
----------------------------------------------------------------------
.../lib/appdata/snapshot/AbstractAppDataSnapshotServer.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b24e99f7/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index a51908f..af89109 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -158,7 +158,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
final public void activate(OperatorContext ctx)
{
- embeddableQueryInfoProvider.activate(ctx);
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.activate(ctx);
+ }
}
@SuppressWarnings("unchecked")
[18/25] incubator-apex-malhar git commit: Remove redundant version
Posted by da...@apache.org.
Remove redundant version
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3e07843f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3e07843f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3e07843f
Branch: refs/heads/feature-AppData
Commit: 3e07843f8396a6f431410ab71eefcc1df9c175c0
Parents: c4a6d8d
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 23:49:16 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 23:49:16 2015 -0700
----------------------------------------------------------------------
apps/logstream/pom.xml | 1 -
demos/echoserver/pom.xml | 1 -
2 files changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3e07843f/apps/logstream/pom.xml
----------------------------------------------------------------------
diff --git a/apps/logstream/pom.xml b/apps/logstream/pom.xml
index ce0f4ce..67df24c 100644
--- a/apps/logstream/pom.xml
+++ b/apps/logstream/pom.xml
@@ -8,7 +8,6 @@
</parent>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
<artifactId>logstream</artifactId>
<packaging>jar</packaging>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3e07843f/demos/echoserver/pom.xml
----------------------------------------------------------------------
diff --git a/demos/echoserver/pom.xml b/demos/echoserver/pom.xml
index ad20744..3d29f7f 100644
--- a/demos/echoserver/pom.xml
+++ b/demos/echoserver/pom.xml
@@ -8,7 +8,6 @@
</parent>
<groupId>com.datatorrent</groupId>
- <version>3.1.0-SNAPSHOT</version>
<artifactId>echoserver</artifactId>
<packaging>jar</packaging>
[12/25] incubator-apex-malhar git commit: Removed checking all the
window ids in idempotency storage before replay
Posted by da...@apache.org.
Removed checking all the window ids in idempotency storage before replay
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ada42ab9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ada42ab9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ada42ab9
Branch: refs/heads/feature-AppData
Commit: ada42ab9ba0fe17162a38162b6d889afc91e741c
Parents: a57a3d7
Author: ishark <is...@datatorrent.com>
Authored: Fri Aug 14 14:37:24 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 14:37:24 2015 -0700
----------------------------------------------------------------------
.../redis/AbstractRedisInputOperator.java | 23 ++++++++++----------
1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ada42ab9/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 7f79bd0..260fbf6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -35,7 +35,8 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
* @category Input
* @tags redis, key value
*
- * @param <T> The tuple type.
+ * @param <T>
+ * The tuple type.
* @since 0.9.3
*/
public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
@@ -47,6 +48,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
private transient Integer backupOffset;
private int scanCount;
private transient boolean replay;
+ private transient boolean skipOffsetRecovery = true;
@NotNull
private IdempotentStorageManager idempotentStorageManager;
@@ -92,33 +94,27 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
private void replay(long windowId)
{
try {
- if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+ // For first recovered window, offset is already part of recovery state.
+ // So skip reading from idempotency manager
+ if (!skipOffsetRecovery) {
// Begin offset for this window is recovery offset stored for the last
// window
RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
}
-
+ skipOffsetRecovery = false;
RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
if (recoveryState.scanOffsetAtBeginWindow != null) {
scanOffset = recoveryState.scanOffsetAtBeginWindow;
}
replay = true;
+
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
- private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
- {
- long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
- if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
- return false;
- }
- return true ;
- }
-
private void scanKeysFromOffset()
{
if (!scanComplete) {
@@ -157,11 +153,14 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOp
scanComplete = false;
scanParameters = new ScanParams();
scanParameters.count(scanCount);
+
// For the 1st window after checkpoint, windowID - 1 would not have recovery
// offset stored in idempotentStorageManager
// But recoveryOffset is non-transient, so will be recovered with
// checkPointing
+ // Offset recovery from idempotency storage can be skipped in this case
scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ skipOffsetRecovery = true;
}
@Override
[22/25] incubator-apex-malhar git commit: Added the ability to sort
schemas returned in a schema result.
Posted by da...@apache.org.
Added the ability to sort schemas returned in a schema result.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d19f7463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d19f7463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d19f7463
Branch: refs/heads/feature-AppData
Commit: d19f7463a34ae216aab605c3d16b6bdfdfa78b29
Parents: 819e175
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Aug 13 11:10:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700
----------------------------------------------------------------------
.../appdata/schemas/SchemaRegistryMultiple.java | 28 +++++++++++++++++---
1 file changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d19f7463/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
index 9833e94..832cf0a 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultiple.java
@@ -15,8 +15,11 @@
*/
package com.datatorrent.lib.appdata.schemas;
+import com.google.common.base.Preconditions;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -35,6 +38,7 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
* The dimensional table which holds the mapping from schema keys to schemas.
*/
private DimensionalTable<Schema> table;
+ private Comparator<Schema> schemaComparator;
/**
* Constructor for serialization.
@@ -53,20 +57,36 @@ public class SchemaRegistryMultiple implements SchemaRegistry, Serializable
table = new DimensionalTable<Schema>(schemaKeys);
}
+ /**
+ * The names of all the schema keys for all schemas in this registry.
+ * @param schemaKeys The names of all the schema keys for all schemas in this registry.
+ * @param schemaComparator The comparator used to order the schemas returned in the {@link SchemaResult} produced
+ * by {@link SchemaRegistryMultiple#getSchemaResult(com.datatorrent.lib.appdata.schemas.SchemaQuery)}
+ */
+ public SchemaRegistryMultiple(List<String> schemaKeys,
+ Comparator<Schema> schemaComparator)
+ {
+ this(schemaKeys);
+ this.schemaComparator = Preconditions.checkNotNull(schemaComparator);
+ }
+
@Override
public SchemaResult getSchemaResult(SchemaQuery schemaQuery)
{
Map<String, String> schemaKeys = schemaQuery.getSchemaKeys();
List<Schema> data = null;
- if(schemaKeys == null) {
+ if (schemaKeys == null) {
data = table.getAllDataPoints();
- }
- else {
+ } else {
data = table.getDataPoints(schemaKeys);
}
- if(data.isEmpty()) {
+ if (schemaComparator != null) {
+ Collections.sort(data, schemaComparator);
+ }
+
+ if (data.isEmpty()) {
return null;
}
[16/25] incubator-apex-malhar git commit: Merge branch
'ishark-fixVersionCompatibilityIssue' into v3.1.0
Posted by da...@apache.org.
Merge branch 'ishark-fixVersionCompatibilityIssue' into v3.1.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/e0ee8ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/e0ee8ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/e0ee8ab6
Branch: refs/heads/feature-AppData
Commit: e0ee8ab62b2a0d99dbc79bd4726dc33d3dfed96f
Parents: 0b31fee 731b8bb
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Tue Aug 18 13:51:46 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Tue Aug 18 13:51:46 2015 -0700
----------------------------------------------------------------------
.../contrib/redis/AbstractRedisInputOperator.java | 7 +++++--
.../contrib/redis/RedisKeyValueInputOperator.java | 9 +++++++++
.../contrib/redis/RedisMapAsValueInputOperator.java | 8 ++++++++
.../datatorrent/contrib/redis/RedisPOJOInputOperator.java | 7 +++++++
4 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[14/25] incubator-apex-malhar git commit: Merge pull request #1533
from 243826/semantic-versioning-minor
Posted by da...@apache.org.
Merge pull request #1533 from 243826/semantic-versioning-minor
enable server plugin
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0b31fee5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0b31fee5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0b31fee5
Branch: refs/heads/feature-AppData
Commit: 0b31fee52afdbe11f6100b3881481b1962bb164c
Parents: 717168b 2e5813d
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Aug 17 17:42:10 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Aug 17 17:42:10 2015 -0700
----------------------------------------------------------------------
apps/logstream/pom.xml | 1 +
demos/pom.xml | 1 +
pom.xml | 52 +++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0b31fee5/demos/pom.xml
----------------------------------------------------------------------
[08/25] incubator-apex-malhar git commit: MLHR-1734 #resolve Added
idempotency changes for RabbitMQ input and output operator
Posted by da...@apache.org.
MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/13a3fbea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/13a3fbea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/13a3fbea
Branch: refs/heads/feature-AppData
Commit: 13a3fbea74b7deaf674c5f42dd945ebd01e17f65
Parents: 8e94665
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 16:14:14 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Aug 10 19:00:55 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../rabbitmq/AbstractRabbitMQInputOperator.java | 141 +++++++++++++++++--
.../AbstractRabbitMQOutputOperator.java | 65 ++++++++-
...bstractSinglePortRabbitMQOutputOperator.java | 5 +-
.../rabbitmq/RabbitMQInputOperatorTest.java | 86 ++++++++---
.../rabbitmq/RabbitMQOutputOperatorTest.java | 28 ++--
6 files changed, 283 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..76e8144 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,5 +565,11 @@
<version>${dt.framework.version}</version>
<type>jar</type>
</dependency>
+ <dependency>
+ <groupId>com.datatorrent</groupId>
+ <artifactId>dt-engine</artifactId>
+ <version>${dt.framework.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 06b3b88..e408f5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -16,12 +16,22 @@
package com.datatorrent.contrib.rabbitmq;
import com.datatorrent.api.*;
-import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
import com.rabbitmq.client.*;
+
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+
import javax.validation.constraints.NotNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +71,9 @@ import org.slf4j.LoggerFactory;
*
* @since 0.3.2
*/
-public abstract class AbstractRabbitMQInputOperator<T>
- implements InputOperator,
-Operator.ActivationListener<OperatorContext>
+public abstract class AbstractRabbitMQInputOperator<T> implements
+ InputOperator, Operator.ActivationListener<OperatorContext>,
+ Operator.CheckpointListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
@NotNull
@@ -87,8 +97,23 @@ Operator.ActivationListener<OperatorContext>
protected transient Channel channel;
protected transient TracingConsumer tracingConsumer;
protected transient String cTag;
- protected transient ArrayBlockingQueue<byte[]> holdingBuffer;
+
+ protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer;
+ private IdempotentStorageManager idempotentStorageManager;
+ protected final transient Map<Long, byte[]> currentWindowRecoveryState;
+ private transient final Set<Long> pendingAck;
+ private transient final Set<Long> recoveredTags;
+ private transient long currentWindowId;
+ private transient int operatorContextId;
+
+ public AbstractRabbitMQInputOperator()
+ {
+ currentWindowRecoveryState = new HashMap<Long, byte[]>();
+ pendingAck = new HashSet<Long>();
+ recoveredTags = new HashSet<Long>();
+ }
+
/**
* define a consumer which can asynchronously receive data,
* and added to holdingBuffer
@@ -124,8 +149,19 @@ Operator.ActivationListener<OperatorContext>
@Override
public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
- holdingBuffer.add(body);
-// logger.debug("Received Async message:" + new String(body)+" buffersize:"+holdingBuffer.size());
+ long tag = envelope.getDeliveryTag();
+ if(envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag)))
+ {
+ if(recoveredTags.contains(tag)) {
+ pendingAck.add(tag);
+ }
+ return;
+ }
+
+ // Acknowledgements are sent at the end of the window after adding to idempotency manager
+ pendingAck.add(tag);
+ holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
+ logger.debug("Received Async message: {} buffersize: {} ", new String(body), holdingBuffer.size());
}
}
@@ -137,7 +173,9 @@ Operator.ActivationListener<OperatorContext>
ntuples = holdingBuffer.size();
}
for (int i = ntuples; i-- > 0;) {
- emitTuple(holdingBuffer.poll());
+ KeyValPair<Long, byte[]> message = holdingBuffer.poll();
+ currentWindowRecoveryState.put(message.getKey(), message.getValue());
+ emitTuple(message.getValue());
}
}
@@ -146,22 +184,72 @@ Operator.ActivationListener<OperatorContext>
@Override
public void beginWindow(long windowId)
{
+ currentWindowId = windowId;
+ if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
}
+ @SuppressWarnings("unchecked")
+ private void replay(long windowId) {
+ Map<Long, byte[]> recoveredData;
+ try {
+ recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId);
+ if (recoveredData == null) {
+ return;
+ }
+ for (Entry<Long, byte[]> recoveredEntry : recoveredData.entrySet()) {
+ recoveredTags.add(recoveredEntry.getKey());
+ emitTuple(recoveredEntry.getValue());
+ }
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+
@Override
public void endWindow()
{
+ //No more messages can be consumed now. so we will call emit tuples once more
+ //so that any pending messages can be emitted.
+ KeyValPair<Long, byte[]> message;
+ while ((message = holdingBuffer.poll()) != null) {
+ currentWindowRecoveryState.put(message.getKey(), message.getValue());
+ emitTuple(message.getValue());
+ }
+
+ try {
+ this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ currentWindowRecoveryState.clear();
+
+ for (Long deliveryTag : pendingAck) {
+ try {
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ pendingAck.clear();
}
@Override
public void setup(OperatorContext context)
{
- holdingBuffer = new ArrayBlockingQueue<byte[]>(bufferSize);
+ this.operatorContextId = context.getId();
+ holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize);
+ this.idempotentStorageManager.setup(context);
}
@Override
public void teardown()
{
+ this.idempotentStorageManager.teardown();
}
@Override
@@ -178,10 +266,12 @@ Operator.ActivationListener<OperatorContext>
channel = connection.createChannel();
channel.exchangeDeclare(exchange, exchangeType);
+ boolean resetQueueName = false;
if (queueName == null){
// unique queuename is generated
// used in case of fanout exchange
queueName = channel.queueDeclare().getQueue();
+ resetQueueName = true;
} else {
// user supplied name
// used in case of direct exchange
@@ -193,7 +283,11 @@ Operator.ActivationListener<OperatorContext>
// consumer = new QueueingConsumer(channel);
// channel.basicConsume(queueName, true, consumer);
tracingConsumer = new TracingConsumer(channel);
- cTag = channel.basicConsume(queueName, true, tracingConsumer);
+ cTag = channel.basicConsume(queueName, false, tracingConsumer);
+ if(resetQueueName)
+ {
+ queueName = null;
+ }
}
catch (IOException ex) {
throw new RuntimeException("Connection Failure", ex);
@@ -211,6 +305,23 @@ Operator.ActivationListener<OperatorContext>
logger.debug(ex.toString());
}
}
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ idempotentStorageManager.deleteUpTo(operatorContextId, windowId);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("committing", e);
+ }
+ }
+
public void setTupleBlast(int i)
{
this.tuple_blast = i;
@@ -275,5 +386,15 @@ Operator.ActivationListener<OperatorContext>
{
this.routingKey = routingKey;
}
+
+ public IdempotentStorageManager getIdempotentStorageManager() {
+ return idempotentStorageManager;
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index a78febb..cc6b7db 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -16,12 +16,16 @@
package com.datatorrent.contrib.rabbitmq;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.api.Context.OperatorContext;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,21 +70,70 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
transient Channel channel = null;
transient String exchange = "testEx";
transient String queueName="testQ";
+
+ private IdempotentStorageManager idempotentStorageManager;
+ private transient long currentWindowId;
+ private transient long largestRecoveryWindowId;
+ private transient int operatorContextId;
+ protected transient boolean skipProcessingTuple = false;
+ private transient OperatorContext context;
+
@Override
public void setup(OperatorContext context)
{
+ // Needed to setup idempotency storage manager in setter
+ this.context = context;
+ this.operatorContextId = context.getId();
+
try {
connFactory.setHost("localhost");
connection = connFactory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
-// channel.queueDeclare(queueName, false, false, false, null);
+
+ this.idempotentStorageManager.setup(context);
+
}
catch (IOException ex) {
logger.debug(ex.toString());
+ DTThrowable.rethrow(ex);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow();
+ if (windowId <= largestRecoveryWindowId) {
+ // Do not resend already sent tuples
+ skipProcessingTuple = true;
+ }
+ else
+ {
+ skipProcessingTuple = false;
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void endWindow()
+ {
+ if(currentWindowId < largestRecoveryWindowId)
+ {
+ // ignore
+ return;
+ }
+ try {
+ idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
public void setQueueName(String queueName) {
this.queueName = queueName;
@@ -95,9 +148,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
try {
channel.close();
connection.close();
+ this.idempotentStorageManager.teardown();
}
catch (IOException ex) {
logger.debug(ex.toString());
}
}
+
+ public IdempotentStorageManager getIdempotentStorageManager() {
+ return idempotentStorageManager;
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
index c16f70f..8e6ff39 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
@@ -60,7 +60,10 @@ public abstract class AbstractSinglePortRabbitMQOutputOperator<T> extends Abstra
@Override
public void process(T tuple)
{
- processTuple(tuple); // This is an abstract call
+ if(!skipProcessingTuple)
+ {
+ processTuple(tuple); // This is an abstract call
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 041e362..a14f4e7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -25,17 +25,21 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.contrib.helper.CollectorModule;
import com.datatorrent.contrib.helper.MessageQueueTestHelper;
-
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -44,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable;
public class RabbitMQInputOperatorTest
{
private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
-
+
public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
{
@Override
@@ -75,7 +79,6 @@ public class RabbitMQInputOperatorTest
connection = connFactory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
-// channel.queueDeclare(queueName, false, false, false, null);
}
public void setQueueName(String queueName)
@@ -86,9 +89,7 @@ public class RabbitMQInputOperatorTest
public void process(Object message) throws IOException
{
String msg = message.toString();
-// logger.debug("publish:" + msg);
channel.basicPublish(exchange, "", null, msg.getBytes());
-// channel.basicPublish("", queueName, null, msg.getBytes());
}
public void teardown() throws IOException
@@ -100,12 +101,11 @@ public class RabbitMQInputOperatorTest
public void generateMessages(int msgCount) throws InterruptedException, IOException
{
for (int i = 0; i < msgCount; i++) {
-
- ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
- for(int j =0; j < dataMaps.size(); j++)
- {
- process(dataMaps.get(j));
- }
+
+ ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
+ for (int j = 0; j < dataMaps.size(); j++) {
+ process(dataMaps.get(j));
+ }
}
}
@@ -124,6 +124,8 @@ public class RabbitMQInputOperatorTest
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class);
+ consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>());
consumer.setHost("localhost");
@@ -144,7 +146,7 @@ public class RabbitMQInputOperatorTest
public void run()
{
long startTms = System.currentTimeMillis();
- long timeout = 10000L;
+ long timeout = 100000L;
try {
while (!collector.inputPort.collections.containsKey("collector") && System.currentTimeMillis() - startTms < timeout) {
Thread.sleep(500);
@@ -153,16 +155,14 @@ public class RabbitMQInputOperatorTest
startTms = System.currentTimeMillis();
while (System.currentTimeMillis() - startTms < timeout) {
List<?> list = collector.inputPort.collections.get("collector");
-
+
if (list.size() < testNum * 3) {
Thread.sleep(10);
- }
- else {
+ } else {
break;
}
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
logger.error(ex.getMessage(), ex);
DTThrowable.rethrow(ex);
} catch (InterruptedException ex) {
@@ -179,5 +179,53 @@ public class RabbitMQInputOperatorTest
logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
- }
+ }
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ RabbitMQInputOperator operator = new RabbitMQInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ operator.setHost("localhost");
+ operator.setExchange("testEx");
+ operator.setExchangeType("fanout");
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+ operator.outputPort.setSink(sink);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+ operator.setup(context);
+ operator.activate(context);
+
+ final RabbitMQMessageGenerator publisher = new RabbitMQMessageGenerator();
+ publisher.setup();
+ publisher.generateMessages(5);
+
+ Thread.sleep(10000);
+
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ operator.deactivate();
+ Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+
+ // failure and then re-deployment of operator
+ sink.collectedTuples.clear();
+ operator.setup(context);
+ operator.activate(context);
+
+ Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+ operator.beginWindow(1);
+ operator.endWindow();
+ Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+ sink.collectedTuples.clear();
+
+ operator.deactivate();
+ operator.teardown();
+ operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1);
+ publisher.teardown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index a170a0e..27213c3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
import org.slf4j.LoggerFactory;
import com.datatorrent.contrib.helper.SourceModule;
-
+import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
@@ -45,7 +45,7 @@ public class RabbitMQOutputOperatorTest
public int count = 0;
private final String host = "localhost";
ConnectionFactory connFactory = new ConnectionFactory();
-// QueueingConsumer consumer = null;
+ // QueueingConsumer consumer = null;
Connection connection = null;
Channel channel = null;
TracingConsumer tracingConsumer = null;
@@ -64,8 +64,6 @@ public class RabbitMQOutputOperatorTest
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
-// consumer = new QueueingConsumer(channel);
-// channel.basicConsume(queueName, true, consumer);
tracingConsumer = new TracingConsumer(channel);
cTag = channel.basicConsume(queueName, true, tracingConsumer);
}
@@ -125,7 +123,6 @@ public class RabbitMQOutputOperatorTest
}
}
-
@Test
public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
{
@@ -133,7 +130,7 @@ public class RabbitMQOutputOperatorTest
runTest(testNum);
logger.debug("end of test");
}
-
+
protected void runTest(int testNum) throws IOException
{
RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver();
@@ -144,23 +141,22 @@ public class RabbitMQOutputOperatorTest
SourceModule source = dag.addOperator("source", new SourceModule());
source.setTestNum(testNum);
RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator());
-
+ collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
collector.setExchange("testEx");
dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
final LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
lc.runAsync();
- try {
+ try {
Thread.sleep(1000);
long timeout = 10000L;
long startTms = System.currentTimeMillis();
- while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
- {
+ while ((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) {
Thread.sleep(100);
- }
- }
- catch (InterruptedException ex) {
+ }
+ } catch (InterruptedException ex) {
Assert.fail(ex.getMessage());
} finally {
lc.shutdown();
@@ -170,11 +166,9 @@ public class RabbitMQOutputOperatorTest
for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
if (e.getKey().equals("a")) {
Assert.assertEquals("emitted value for 'a' was ", new Integer(2), e.getValue());
- }
- else if (e.getKey().equals("b")) {
+ } else if (e.getKey().equals("b")) {
Assert.assertEquals("emitted value for 'b' was ", new Integer(20), e.getValue());
- }
- else if (e.getKey().equals("c")) {
+ } else if (e.getKey().equals("c")) {
Assert.assertEquals("emitted value for 'c' was ", new Integer(1000), e.getValue());
}
}
[07/25] incubator-apex-malhar git commit: Merge pull request #1531
from chandnisingh/feature-bugFixes-3.1.0
Posted by da...@apache.org.
Merge pull request #1531 from chandnisingh/feature-bugFixes-3.1.0
omitting type from FieldInfo
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8e94665e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8e94665e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8e94665e
Branch: refs/heads/feature-AppData
Commit: 8e94665e5430225a609b475e465830d8e19ea872
Parents: f40ba34 f6d85fe
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Aug 10 14:13:26 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Aug 10 14:13:26 2015 -0700
----------------------------------------------------------------------
library/src/main/java/com/datatorrent/lib/util/FieldInfo.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
[23/25] incubator-apex-malhar git commit: Fixed backwards
compatibility errors.
Posted by da...@apache.org.
Fixed backwards compatibility errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/59622483
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/59622483
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/59622483
Branch: refs/heads/feature-AppData
Commit: 59622483ebeccbbbf37fb428020fddf38c14627e
Parents: b24e99f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 28 15:47:52 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700
----------------------------------------------------------------------
demos/pom.xml | 2 +-
.../query/serde/DataQuerySnapshotValidator.java | 2 +-
.../lib/appdata/schemas/DataQuerySnapshot.java | 7 ++++-
.../lib/appdata/schemas/SchemaUtils.java | 27 ++++++++++++++++++++
.../lib/io/PubSubWebSocketAppDataQuery.java | 8 +++---
.../lib/io/PubSubWebSocketAppDataResult.java | 6 ++---
.../lib/io/WebSocketInputOperator.java | 2 +-
.../lib/io/WebSocketOutputOperator.java | 2 +-
pom.xml | 4 +--
9 files changed, 46 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 7976aa0..2ef2fa9 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -28,7 +28,7 @@
</modules>
<properties>
- <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version>
+ <datatorrent.version>3.2.0-SNAPSHOT</datatorrent.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
<semver.plugin.skip>true</semver.plugin.skip>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
index d2b7ef0..2eb788d 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
@@ -55,7 +55,7 @@ public class DataQuerySnapshotValidator implements CustomMessageValidator
}
if (gdqt.getFields().getFields().isEmpty()) {
- gdqt.setFields(new Fields(fields));
+ gdqt.setFieldsVal(new Fields(fields));
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
index 5714597..8e68380 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
@@ -136,12 +136,17 @@ public class DataQuerySnapshot extends Query
* Sets the fields of the query.
* @param fields The fields of the query.
*/
- public final void setFields(Fields fields)
+ private void setFields(Fields fields)
{
Preconditions.checkNotNull(fields);
this.fields = fields;
}
+ public void setFieldsVal(Fields fields)
+ {
+ setFields(fields);
+ }
+
/**
* Gets the fields of the query.
* @return The fields of the query.
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index ce68c7f..de3c013 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -21,6 +21,7 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -137,6 +138,13 @@ public class SchemaUtils
public static boolean checkValidKeys(JSONObject jo,
Collection<Fields> fieldsCollection)
{
+ return checkValidKeysHelper(jo,
+ fieldsCollection);
+ }
+
+ private static boolean checkValidKeysHelper(JSONObject jo,
+ Collection<Fields> fieldsCollection)
+ {
for (Fields fields: fieldsCollection) {
LOG.debug("Checking keys: {}", fields);
if (checkValidKeys(jo, fields)) {
@@ -151,6 +159,12 @@ public class SchemaUtils
return false;
}
+ public static boolean checkValidKeys(JSONObject jo, List<Fields> fieldsCollection)
+ {
+ return checkValidKeysHelper(jo,
+ fieldsCollection);
+ }
+
/**
* This is a utility method to check that the given JSONObject has the given keys.
* It throws an {@link IllegalArgumentException} if it doesn't contain all the given keys.
@@ -161,6 +175,13 @@ public class SchemaUtils
public static boolean checkValidKeysEx(JSONObject jo,
Collection<Fields> fieldsCollection)
{
+ return checkValidKeysExHelper(jo,
+ fieldsCollection);
+ }
+
+ public static boolean checkValidKeysExHelper(JSONObject jo,
+ Collection<Fields> fieldsCollection)
+ {
for (Fields fields: fieldsCollection) {
if (checkValidKeys(jo, fields)) {
return true;
@@ -175,6 +196,12 @@ public class SchemaUtils
fieldsCollection);
}
+ public static boolean checkValidKeysEx(JSONObject jo, List<Fields> fieldsCollection)
+ {
+ return checkValidKeysExHelper(jo,
+ fieldsCollection);
+ }
+
public static Set<String> getSetOfJSONKeys(JSONObject jo)
{
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 031befd..889a4d3 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -68,7 +68,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
@Override
public void setup(OperatorContext context)
{
- this.uri = uriHelper(context, uri);
+ setUri(uriHelper(context, getUri()));
logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
super.setup(context);
@@ -83,7 +83,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
-
+
if (windowBoundedService != null) {
windowBoundedService.beginWindow(windowId);
}
@@ -136,7 +136,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
@Override
public URI getUri()
{
- return uri;
+ return super.getUri();
}
/**
@@ -148,7 +148,7 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
@Override
public void setUri(URI uri)
{
- this.uri = uri;
+ super.setUri(uri);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index 5f0b947..cdae7b8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -48,7 +48,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
@Override
public void setup(OperatorContext context)
{
- this.uri = PubSubWebSocketAppDataQuery.uriHelper(context, uri);
+ setUri(PubSubWebSocketAppDataQuery.uriHelper(context, getUri()));
logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
super.setup(context);
}
@@ -67,7 +67,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
@Override
public URI getUri()
{
- return uri;
+ return super.getUri();
}
/**
@@ -79,7 +79,7 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
@Override
public void setUri(URI uri)
{
- this.uri = uri;
+ super.setUri(uri);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index 69ebfa3..02b9ef2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -53,7 +53,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
*/
public int readTimeoutMillis = 0;
//Do not make this @NotNull since null is a valid value for some child classes
- protected URI uri;
+ private URI uri;
private transient AsyncHttpClient client;
private transient final JsonFactory jsonFactory = new JsonFactory();
protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index f46ccb8..a0cf465 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -49,7 +49,7 @@ public class WebSocketOutputOperator<T> extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class);
//Do not make this @NotNull since null is a valid value for some child classes
- protected URI uri;
+ private URI uri;
private transient AsyncHttpClient client;
private transient final JsonFactory jsonFactory = new JsonFactory();
protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/59622483/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e9dbcd6..1468163 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>dt-framework</artifactId>
- <version>3.1.0-SNAPSHOT</version>
+ <version>3.2.0-SNAPSHOT</version>
</parent>
<artifactId>malhar</artifactId>
@@ -38,7 +38,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netbeans.hint.license>malhar-inc</netbeans.hint.license>
<maven.deploy.skip>false</maven.deploy.skip>
- <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version>
+ <dt.framework.version>3.2.0-SNAPSHOT</dt.framework.version>
<!-- the following properties match the properties defined in core/pom.xml -->
<jackson.version>1.9.2</jackson.version>
<jersey.version>1.9</jersey.version>
[10/25] incubator-apex-malhar git commit: Merge branch
'ishark-rabbitMQ' into v3.1.0
Posted by da...@apache.org.
Merge branch 'ishark-rabbitMQ' into v3.1.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0b3bb88d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0b3bb88d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0b3bb88d
Branch: refs/heads/feature-AppData
Commit: 0b3bb88d8ebd2e52d483f77f7951eb56767f32df
Parents: 8e94665 4dc4788
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Wed Aug 12 21:08:44 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Wed Aug 12 21:08:44 2015 -0700
----------------------------------------------------------------------
.../rabbitmq/AbstractRabbitMQInputOperator.java | 142 +++++++++++++++++--
.../AbstractRabbitMQOutputOperator.java | 65 ++++++++-
...bstractSinglePortRabbitMQOutputOperator.java | 5 +-
.../rabbitmq/RabbitMQInputOperatorTest.java | 86 ++++++++---
.../rabbitmq/RabbitMQOutputOperatorTest.java | 28 ++--
5 files changed, 278 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
[15/25] incubator-apex-malhar git commit: Fix version compatibility
issue for Abstract redis store input operator. Reverted back to original
super class
Posted by da...@apache.org.
Fix version compatibility issue for Abstract redis store input operator. Reverted back to original super class
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/731b8bbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/731b8bbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/731b8bbe
Branch: refs/heads/feature-AppData
Commit: 731b8bbe3b09ac54a5c5a985e1e76667cc417002
Parents: 0b31fee
Author: ishark <is...@datatorrent.com>
Authored: Tue Aug 18 13:36:21 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Tue Aug 18 13:36:21 2015 -0700
----------------------------------------------------------------------
.../contrib/redis/AbstractRedisInputOperator.java | 7 +++++--
.../contrib/redis/RedisKeyValueInputOperator.java | 9 +++++++++
.../contrib/redis/RedisMapAsValueInputOperator.java | 8 ++++++++
.../datatorrent/contrib/redis/RedisPOJOInputOperator.java | 7 +++++++
4 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index 260fbf6..5e62dbb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -19,13 +19,16 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+
import javax.validation.constraints.NotNull;
+
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
+
import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
import com.datatorrent.lib.io.IdempotentStorageManager;
/**
@@ -39,7 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
* The tuple type.
* @since 0.9.3
*/
-public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
+public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener
{
protected transient List<String> keys = new ArrayList<String>();
protected transient Integer scanOffset;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
index 8f419bd..0d0efe8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -18,6 +18,8 @@ package com.datatorrent.contrib.redis;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
import com.datatorrent.lib.util.KeyValPair;
/**
@@ -52,4 +54,11 @@ public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyVa
keysObjectList.clear();
}
}
+
+ @Override
+ public KeyValPair<String, String> convertToTuple(Map<Object, Object> o)
+ {
+ // Do nothing for the override, Scan already done in processTuples
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
index 66ef582..a7f0cd2 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -16,6 +16,7 @@
package com.datatorrent.contrib.redis;
import java.util.Map;
+
import com.datatorrent.lib.util.KeyValPair;
/**
@@ -42,4 +43,11 @@ public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<Key
}
keys.clear();
}
+
+ @Override
+ public KeyValPair<String, Map<String, String>> convertToTuple(Map<Object, Object> o)
+ {
+ // Do nothing for the override, Emit already handled in processTuples
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
index 5a73e61..ac3f7fc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -201,4 +201,11 @@ public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPai
{
this.dataColumns = dataColumns;
}
+
+ @Override
+ public KeyValPair<String, Object> convertToTuple(Map<Object, Object> o)
+ {
+ // Do nothing for the override, Scan already done in processTuples
+ return null;
+ }
}
[11/25] incubator-apex-malhar git commit: MLHR-1748 #resolve Created
concrete input and output operators for Redis Store Added test cases for the
same.
Posted by da...@apache.org.
MLHR-1748 #resolve Created concrete input and output operators for Redis Store
Added test cases for the same.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a57a3d75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a57a3d75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a57a3d75
Branch: refs/heads/feature-AppData
Commit: a57a3d756bafc1269c827a71d4fea549abdf65dd
Parents: f40ba34
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 15:52:25 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 14 12:07:36 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 8 +-
.../redis/AbstractRedisInputOperator.java | 224 +++++++++++++++++-
.../redis/RedisKeyValueInputOperator.java | 55 +++++
.../redis/RedisMapAsValueInputOperator.java | 45 ++++
.../contrib/redis/RedisPOJOInputOperator.java | 204 ++++++++++++++++
.../contrib/redis/RedisPOJOOutputOperator.java | 155 +++++++++++++
.../datatorrent/contrib/redis/RedisStore.java | 27 +++
.../contrib/redis/RedisInputOperatorTest.java | 193 ++++++++++++++++
.../contrib/redis/RedisPOJOOperatorTest.java | 230 +++++++++++++++++++
demos/machinedata/pom.xml | 2 +-
10 files changed, 1138 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..50d7234 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -181,6 +181,12 @@
<dependencies>
<dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
@@ -382,7 +388,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
- <version>2.2.1</version>
+ <version>2.5.1</version>
<optional>true</optional>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index ff7a9a5..7f79bd0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -15,11 +15,22 @@
*/
package com.datatorrent.contrib.redis;
-import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.netlet.util.DTThrowable;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
/**
* This is the base implementation of a Redis input operator.
- * <p></p>
+ *
* @displayName Abstract Redis Input
* @category Input
* @tags redis, key value
@@ -27,6 +38,213 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
* @param <T> The tuple type.
* @since 0.9.3
*/
-public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore>
+public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener
{
+ protected transient List<String> keys = new ArrayList<String>();
+ protected transient Integer scanOffset;
+ protected transient ScanParams scanParameters;
+ private transient boolean scanComplete;
+ private transient Integer backupOffset;
+ private int scanCount;
+ private transient boolean replay;
+
+ @NotNull
+ private IdempotentStorageManager idempotentStorageManager;
+
+ private transient OperatorContext context;
+ private transient long currentWindowId;
+ private transient Integer sleepTimeMillis;
+ private transient Integer scanCallsInCurrentWindow;
+ private RecoveryState recoveryState;
+
+ /*
+ * Recovery State contains last offset processed in window and number of times
+ * ScanKeys was invoked in window We need to capture to capture number of
+ * calls to ScanKeys because, last offset returned by scanKeys call is not
+ * always monotonically increasing. Storing offset and number of times scan
+ * was done for each window, guarantees idempotency for each window
+ */
+ public static class RecoveryState implements Serializable
+ {
+ public Integer scanOffsetAtBeginWindow, numberOfScanCallsInWindow;
+ }
+
+ public AbstractRedisInputOperator()
+ {
+ scanCount = 100;
+ recoveryState = new RecoveryState();
+ recoveryState.scanOffsetAtBeginWindow = 0;
+ recoveryState.numberOfScanCallsInWindow = 0;
+ setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ scanCallsInCurrentWindow = 0;
+ replay = false;
+ if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
+ }
+
+ private void replay(long windowId)
+ {
+ try {
+ if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) {
+ // Begin offset for this window is recovery offset stored for the last
+ // window
+ RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1);
+ recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow;
+ }
+
+ RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId);
+ recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
+ if (recoveryState.scanOffsetAtBeginWindow != null) {
+ scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ }
+ replay = true;
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException
+ {
+ long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId());
+ if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) {
+ return false;
+ }
+ return true ;
+ }
+
+ private void scanKeysFromOffset()
+ {
+ if (!scanComplete) {
+ if (replay && scanCallsInCurrentWindow >= recoveryState.numberOfScanCallsInWindow) {
+ try {
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ return;
+ }
+
+ ScanResult<String> result = store.ScanKeys(scanOffset, scanParameters);
+ backupOffset = scanOffset;
+ scanOffset = Integer.parseInt(result.getStringCursor());
+ if (scanOffset == 0) {
+ // Redis store returns 0 after all data is read
+ scanComplete = true;
+
+ // point scanOffset to the end in this case for reading any new tuples
+ scanOffset = backupOffset + result.getResult().size();
+ }
+ keys = result.getResult();
+ }
+ scanCallsInCurrentWindow++;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ sleepTimeMillis = context.getValue(context.SPIN_MILLIS);
+ getIdempotentStorageManager().setup(context);
+ this.context = context;
+ scanOffset = 0;
+ scanComplete = false;
+ scanParameters = new ScanParams();
+ scanParameters.count(scanCount);
+ // For the 1st window after checkpoint, windowID - 1 would not have recovery
+ // offset stored in idempotentStorageManager
+ // But recoveryOffset is non-transient, so will be recovered with
+ // checkPointing
+ scanOffset = recoveryState.scanOffsetAtBeginWindow;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ while (replay && scanCallsInCurrentWindow < recoveryState.numberOfScanCallsInWindow) {
+ // If less keys got scanned in this window, scan till recovery offset
+ scanKeysFromOffset();
+ processTuples();
+ }
+ super.endWindow();
+ recoveryState.scanOffsetAtBeginWindow = scanOffset;
+ recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
+
+ if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) {
+ try {
+ getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ getIdempotentStorageManager().teardown();
+ }
+
+ /*
+ * get number of keys to read for each redis key scan
+ */
+ public int getScanCount()
+ {
+ return scanCount;
+ }
+
+ /*
+ * set number of keys to read for each redis key scan
+ */
+ public void setScanCount(int scanCount)
+ {
+ this.scanCount = scanCount;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ scanKeysFromOffset();
+ processTuples();
+ }
+
+ abstract public void processTuples();
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ getIdempotentStorageManager().deleteUpTo(context.getId(), windowId);
+ } catch (IOException e) {
+ throw new RuntimeException("committing", e);
+ }
+ }
+
+ /*
+ * get Idempotent Storage manager instance
+ */
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return idempotentStorageManager;
+ }
+
+ /*
+ * set Idempotent storage manager instance
+ */
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+ {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
new file mode 100644
index 0000000..8f419bd
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator for fetching
+ * Key-Value pair stored in Redis. It takes in keys to fetch and emits
+ * corresponding <Key, Value> Pair. Value data type is String in this case.
+ *
+ * @displayName Redis Input Operator for Key Value pair
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, String>>
+{
+ private List<Object> keysObjectList = new ArrayList<Object>();
+
+ @Override
+ public void processTuples()
+ {
+ keysObjectList = new ArrayList<Object>(keys);
+ if (keysObjectList.size() > 0) {
+
+ List<Object> allValues = store.getAll(keysObjectList);
+ for (int i = 0; i < allValues.size() && i < keys.size(); i++) {
+ if (allValues.get(i) == null) {
+ outputPort.emit(new KeyValPair<String, String>(keys.get(i), null));
+ } else {
+ outputPort.emit(new KeyValPair<String, String>(keys.get(i), allValues.get(i).toString()));
+ }
+ }
+ keys.clear();
+ keysObjectList.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
new file mode 100644
index 0000000..66ef582
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.Map;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is the an implementation of a Redis input operator It takes in keys to
+ * fetch and emits Values stored as Maps in Redis i.e. when value datatype in
+ * Redis is HashMap
+ *
+ * @displayName Redis Input Operator for Map
+ * @category Store
+ * @tags input operator, key value
+ *
+ */
+
+public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Map<String, String>>>
+{
+ @Override
+ public void processTuples()
+ {
+ for (String key : keys) {
+ if (store.getType(key).equals("hash")) {
+ Map<String, String> mapValue = store.getMap(key);
+ outputPort.emit(new KeyValPair<String, Map<String, String>>(key, mapValue));
+ }
+ }
+ keys.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
new file mode 100644
index 0000000..5a73e61
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import com.datatorrent.lib.util.PojoUtils.SetterShort;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is a Redis input operator, which scans all keys in Redis store It
+ * converts Value stored as map to Plain Old Java Object. It outputs
+ * KeyValuePair with POJO as value
+ * <p>
+ * This output adapter Reads from RedisStore stored as <Key, Map> It outputs a
+ * Key value pair <key, POJO> as tuples.
+ * </p>
+ *
+ * @displayName Redis POJO Input Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+@Evolving
+public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>>
+{
+ protected final Map<String, Object> map = new HashMap<String, Object>();
+ private ArrayList<FieldInfo> dataColumns;
+ private transient ArrayList<Object> setters;
+ private boolean isFirstTuple = true;
+ private String outputClass;
+ private Class<?> objectClass;
+
+ public RedisPOJOInputOperator()
+ {
+ super();
+ setters = new ArrayList<Object>();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object convertMapToObject(Map<String, String> tuple)
+ {
+ try {
+ Object mappedObject = objectClass.newInstance();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String columnName = dataColumns.get(i).getColumnName();
+
+ if (i < setters.size()) {
+ String value = tuple.get(columnName);
+ switch (type) {
+ case STRING:
+ ((Setter<Object, String>) setters.get(i)).set(mappedObject, value);
+ break;
+ case BOOLEAN:
+ ((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value));
+ break;
+ case SHORT:
+ ((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value));
+ break;
+ case INTEGER:
+ ((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value));
+ break;
+ case LONG:
+ ((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value));
+ break;
+ case FLOAT:
+ ((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value));
+ break;
+ case DOUBLE:
+ ((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ return mappedObject;
+ } catch (Exception e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ return null;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ }
+
+ public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException
+ {
+ objectClass = Class.forName(getOutputClass());
+
+ final int size = dataColumns.size();
+ for (int i = 0; i < size; i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+ final Object setter;
+ switch (type) {
+ case STRING:
+ setter = PojoUtils.createSetter(objectClass, getterExpression, String.class);
+ break;
+ case BOOLEAN:
+ setter = PojoUtils.createSetterBoolean(objectClass, getterExpression);
+ break;
+ case SHORT:
+ setter = PojoUtils.createSetterShort(objectClass, getterExpression);
+ break;
+ case INTEGER:
+ setter = PojoUtils.createSetterInt(objectClass, getterExpression);
+ break;
+ case LONG:
+ setter = PojoUtils.createSetterLong(objectClass, getterExpression);
+ break;
+ case FLOAT:
+ setter = PojoUtils.createSetterFloat(objectClass, getterExpression);
+ break;
+ case DOUBLE:
+ setter = PojoUtils.createSetterDouble(objectClass, getterExpression);
+ break;
+ default:
+ setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class);
+ break;
+ }
+ setters.add(setter);
+ }
+ }
+
+ @Override
+ public void processTuples()
+ {
+ for (String key : keys) {
+ if (store.getType(key).equals("hash")) {
+ Map<String, String> mapValue = store.getMap(key);
+ if (isFirstTuple) {
+ try {
+ processFirstTuple(mapValue);
+ } catch (ClassNotFoundException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ isFirstTuple = false;
+ outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue)));
+ }
+ }
+ keys.clear();
+ }
+
+ /*
+ * Output class type
+ */
+ public String getOutputClass()
+ {
+ return outputClass;
+ }
+
+ public void setOutputClass(String outputClass)
+ {
+ this.outputClass = outputClass;
+ }
+
+ /*
+ * An arraylist of data column names to be set in Redis store as a Map. Gets
+ * column names, column expressions and column data types
+ */
+ public ArrayList<FieldInfo> getDataColumns()
+ {
+ return dataColumns;
+ }
+
+ public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+ {
+ this.dataColumns = dataColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
new file mode 100644
index 0000000..8966248
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * This is a Redis output operator, which takes a Key and corresponding Plain
+ * Old Java Object as input. And writes a Map out to Redis based on Expressions
+ * provided.
+ * <p>
+ * This output adapter takes a Key value pair <key, POJO> as tuples and just
+ * writes to the redis store with the key and the value is a Map containing
+ * object attributes as <keys,value> Note: Redis output operator should never
+ * use the passthrough method because it begins a transaction at beginWindow and
+ * commits a transaction at endWindow, and a transaction in Redis blocks all
+ * other clients.
+ * </p>
+ *
+ * @displayName Redis POJO Output Operator
+ * @category Store
+ * @tags output operator, key value
+ *
+ */
+public class RedisPOJOOutputOperator extends AbstractRedisAggregateOutputOperator<KeyValPair<String, Object>>
+{
+ protected final Map<String, Object> map = new HashMap<String, Object>();
+ private ArrayList<FieldInfo> dataColumns;
+ private transient ArrayList<Object> getters;
+ private boolean isFirstTuple = true;
+
+ public RedisPOJOOutputOperator()
+ {
+ super();
+ getters = new ArrayList<Object>();
+ }
+
+ @Override
+ public void storeAggregate()
+ {
+ for (Entry<String, Object> entry : map.entrySet()) {
+
+ Map<String, String> mapObject = convertObjectToMap(entry.getValue());
+ store.put(entry.getKey(), mapObject);
+ }
+ }
+
+ private Map<String, String> convertObjectToMap(Object tuple)
+ {
+
+ Map<String, String> mappedObject = new HashMap<String, String>();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String columnName = dataColumns.get(i).getColumnName();
+
+ if (i < getters.size()) {
+ Getter<Object, Object> obj = (Getter<Object, Object>) (getters.get(i));
+
+ Object value = obj.get(tuple);
+ mappedObject.put(columnName, value.toString());
+ }
+ }
+
+ return mappedObject;
+ }
+
+ public void processFirstTuple(KeyValPair<String, Object> tuple)
+ {
+ // Create getters using first value entry in map
+ // Entry<String, Object> entry= tuple.entrySet().iterator().next();
+ Object value = tuple.getValue();
+
+ final Class<?> fqcn = value.getClass();
+ final int size = dataColumns.size();
+ for (int i = 0; i < size; i++) {
+ final SupportType type = dataColumns.get(i).getType();
+ final String getterExpression = dataColumns.get(i).getPojoFieldExpression();
+ final Object getter;
+ switch (type) {
+ case STRING:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+ break;
+ case BOOLEAN:
+ getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+ break;
+ case SHORT:
+ getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+ break;
+ case INTEGER:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, type.getJavaType());
+ break;
+ case LONG:
+ getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+ break;
+ case FLOAT:
+ getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+ break;
+ case DOUBLE:
+ getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+ break;
+ default:
+ getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+ break;
+ }
+ getters.add(getter);
+ }
+ }
+
+ @Override
+ public void processTuple(KeyValPair<String, Object> tuple)
+ {
+ if (isFirstTuple) {
+ processFirstTuple(tuple);
+ }
+
+ isFirstTuple = false;
+ map.put(tuple.getKey(), tuple.getValue());
+ }
+
+ /*
+ * An arraylist of data column names to be set in Redis store as a Map. Gets
+ * column names, column expressions and column data types
+ */
+ public ArrayList<FieldInfo> getDataColumns()
+ {
+ return dataColumns;
+ }
+
+ public void setDataColumns(ArrayList<FieldInfo> dataColumns)
+ {
+ this.dataColumns = dataColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
index ea8e26b..2acc1d5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
import redis.clients.jedis.Transaction;
import com.datatorrent.lib.db.TransactionableKeyValueStore;
@@ -181,6 +183,26 @@ public class RedisStore implements TransactionableKeyValueStore
return jedis.get(key.toString());
}
+ public String getType(String key)
+ {
+ return jedis.type(key);
+ }
+
+ /**
+ * Gets the stored Map for given the key, when the value data type is a map, stored with hmset
+ *
+ * @param key
+ * @return hashmap stored for the key.
+ */
+ public Map<String, String> getMap(Object key)
+ {
+ if (isInTransaction()) {
+ throw new RuntimeException("Cannot call get when in redis transaction");
+ }
+ return jedis.hgetAll(key.toString());
+ }
+
+
/**
* Gets all the values given the keys.
* Note that it does NOT work with hash values or list values
@@ -255,6 +277,11 @@ public class RedisStore implements TransactionableKeyValueStore
}
}
+ public ScanResult<String> ScanKeys(Integer offset, ScanParams params)
+ {
+ return jedis.scan(offset.toString(), params);
+ }
+
/**
* Calls hincrbyfloat on the redis store.
*
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
new file mode 100644
index 0000000..08fb294
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisInputOperatorTest
+{
+ private RedisStore operatorStore;
+ private RedisStore testStore;
+
+ public static class CollectorModule extends BaseOperator
+ {
+ volatile static List<KeyValPair<String, String>> resultMap = new ArrayList<KeyValPair<String, String>>();
+ static long resultCount = 0;
+
+ public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>()
+ {
+ @Override
+ public void process(KeyValPair<String, String> tuple)
+ {
+ resultMap.add(tuple);
+ resultCount++;
+ }
+ };
+ }
+
+ @Test
+ public void testIntputOperator() throws IOException
+ {
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(1);
+
+ testStore.put("test_abc", "789");
+ testStore.put("test_def", "456");
+ testStore.put("test_ghi", "123");
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ RedisKeyValueInputOperator inputOperator = dag.addOperator("input", new RedisKeyValueInputOperator());
+ final CollectorModule collector = dag.addOperator("collector", new CollectorModule());
+
+ inputOperator.setStore(operatorStore);
+ dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+ final LocalMode.Controller lc = lma.getController();
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 50000L;
+ try {
+ Thread.sleep(1000);
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (CollectorModule.resultMap.size() < 3) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException ex) {
+ }
+ lc.shutdown();
+ }
+ }.start();
+
+ lc.run();
+
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_abc", "789")));
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_def", "456")));
+ Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_ghi", "123")));
+ } finally {
+ for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+ testStore.remove(entry.getKey());
+ }
+ testStore.disconnect();
+ }
+ }
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(1);
+
+ testStore.put("test_abc", "789");
+ testStore.put("test_def", "456");
+ testStore.put("test_ghi", "123");
+
+ RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
+ operator.setStore(operatorStore);
+ operator.setScanCount(1);
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+ operator.outputPort.setSink(sink);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+ try {
+ operator.setup(context);
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ int numberOfMessagesInWindow1 = sink.collectedTuples.size();
+ sink.collectedTuples.clear();
+
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+ int numberOfMessagesInWindow2 = sink.collectedTuples.size();
+ sink.collectedTuples.clear();
+
+ // failure and then re-deployment of operator
+ // Re-instantiating to reset values
+ operator = new RedisKeyValueInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ operator.setStore(operatorStore);
+ operator.setScanCount(1);
+ operator.outputPort.setSink(sink);
+ operator.setup(context);
+
+ Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.emitTuples();
+ operator.endWindow();
+
+ Assert.assertEquals("num of messages in window 1", numberOfMessagesInWindow1, sink.collectedTuples.size());
+
+ sink.collectedTuples.clear();
+ operator.beginWindow(2);
+ operator.emitTuples();
+ operator.endWindow();
+ Assert.assertEquals("num of messages in window 2",numberOfMessagesInWindow2, sink.collectedTuples.size());
+ } finally {
+ for (Object e : sink.collectedTuples) {
+ KeyValPair<String, String> entry = (KeyValPair<String, String>) e;
+ testStore.remove(entry.getKey());
+ }
+ sink.collectedTuples.clear();
+ operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5);
+ operator.teardown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
new file mode 100644
index 0000000..7792b5a
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java
@@ -0,0 +1,230 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.redis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import redis.clients.jedis.ScanParams;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class RedisPOJOOperatorTest
+{
+ private RedisStore operatorStore;
+ private RedisStore testStore;
+
+ public static class TestClass
+ {
+ private Integer intValue;
+ private String stringValue;
+
+ public TestClass()
+ {
+ }
+
+ public TestClass(int v1, String v2)
+ {
+ intValue = v1;
+ stringValue = v2;
+ }
+
+ public Integer getIntValue()
+ {
+ return intValue;
+ }
+
+ public void setIntValue(int intValue)
+ {
+ this.intValue = intValue;
+ }
+
+ public String getStringValue()
+ {
+ return stringValue;
+ }
+
+ public void setStringValue(String stringValue)
+ {
+ this.stringValue = stringValue;
+ }
+ }
+
+ @Test
+ public void testOutputOperator() throws IOException
+ {
+ this.operatorStore = new RedisStore();
+
+ operatorStore.connect();
+ String appId = "test_appid";
+ int operatorId = 0;
+
+ operatorStore.removeCommittedWindowId(appId, operatorId);
+ operatorStore.disconnect();
+
+ RedisPOJOOutputOperator outputOperator = new RedisPOJOOutputOperator();
+
+ ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+ fields.add(new FieldInfo("column1", "intValue", SupportType.INTEGER));
+ fields.add(new FieldInfo("column2", "getStringValue()", SupportType.STRING));
+
+ outputOperator.setDataColumns(fields);
+
+ try {
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_ID, appId);
+
+ outputOperator.setStore(operatorStore);
+ outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes));
+ outputOperator.beginWindow(101);
+
+ KeyValPair<String, Object> keyVal = new KeyValPair<String, Object>("test_abc1", new TestClass(1, "abc"));
+
+ outputOperator.input.process(keyVal);
+
+ outputOperator.endWindow();
+
+ outputOperator.teardown();
+
+ operatorStore.connect();
+
+ Map<String, String> out = operatorStore.getMap("test_abc1");
+ Assert.assertEquals("1", out.get("column1"));
+ Assert.assertEquals("abc", out.get("column2"));
+ } finally {
+ operatorStore.remove("test_abc1");
+ operatorStore.disconnect();
+ }
+ }
+
+ public static class ObjectCollectorModule extends BaseOperator
+ {
+ volatile static Map<String, Object> resultMap = new HashMap<String, Object>();
+ static long resultCount = 0;
+
+ public final transient DefaultInputPort<KeyValPair<String, Object>> inputPort = new DefaultInputPort<KeyValPair<String, Object>>()
+ {
+ @Override
+ public void process(KeyValPair<String, Object> tuple)
+ {
+ resultMap.put(tuple.getKey(), tuple.getValue());
+ resultCount++;
+ }
+ };
+ }
+
+ @Test
+ public void testInputOperator() throws IOException
+ {
+ @SuppressWarnings("unused")
+ Class<?> clazz = org.codehaus.janino.CompilerFactory.class;
+
+ this.operatorStore = new RedisStore();
+ this.testStore = new RedisStore();
+
+ testStore.connect();
+ ScanParams params = new ScanParams();
+ params.count(100);
+
+ Map<String, String> value = new HashMap<String, String>();
+ value.put("Column1", "abc");
+ value.put("Column2", "1");
+
+ Map<String, String> value1 = new HashMap<String, String>();
+ value1.put("Column1", "def");
+ value1.put("Column2", "2");
+
+ Map<String, String> value2 = new HashMap<String, String>();
+ value2.put("Column1", "ghi");
+ value2.put("Column2", "3");
+
+ testStore.put("test_abc_in", value);
+ testStore.put("test_def_in", value1);
+ testStore.put("test_ghi_in", value2);
+
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ RedisPOJOInputOperator inputOperator = dag.addOperator("input", new RedisPOJOInputOperator());
+ final ObjectCollectorModule collector = dag.addOperator("collector", new ObjectCollectorModule());
+
+ ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>();
+
+ fields.add(new FieldInfo("Column1", "stringValue", SupportType.STRING));
+ fields.add(new FieldInfo("Column2", "intValue", SupportType.INTEGER));
+
+ inputOperator.setDataColumns(fields);
+ inputOperator.setOutputClass(TestClass.class.getName());
+
+ inputOperator.setStore(operatorStore);
+ dag.addStream("stream", inputOperator.outputPort, collector.inputPort);
+ final LocalMode.Controller lc = lma.getController();
+
+ new Thread("LocalClusterController")
+ {
+ @Override
+ public void run()
+ {
+ long startTms = System.currentTimeMillis();
+ long timeout = 10000L;
+ try {
+ Thread.sleep(1000);
+ while (System.currentTimeMillis() - startTms < timeout) {
+ if (ObjectCollectorModule.resultMap.size() < 3) {
+ Thread.sleep(10);
+ } else {
+ break;
+ }
+ }
+ } catch (InterruptedException ex) {
+ }
+ lc.shutdown();
+ }
+ }.start();
+
+ lc.run();
+
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_abc_in"));
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_def_in"));
+ Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_ghi_in"));
+
+ TestClass a = (TestClass) ObjectCollectorModule.resultMap.get("test_abc_in");
+ Assert.assertNotNull(a);
+ Assert.assertEquals("abc", a.stringValue);
+ Assert.assertEquals("1", a.intValue.toString());
+ } finally {
+ for (KeyValPair<String, String> entry : CollectorModule.resultMap) {
+ testStore.remove(entry.getKey());
+ }
+ testStore.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/demos/machinedata/pom.xml
----------------------------------------------------------------------
diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml
index 1f3f075..3498d0d 100644
--- a/demos/machinedata/pom.xml
+++ b/demos/machinedata/pom.xml
@@ -31,7 +31,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
- <version>2.2.1</version>
+ <version>2.5.1</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
[25/25] incubator-apex-malhar git commit: Made the twitter demo us an
embeddable query operator.
Posted by da...@apache.org.
Made the twitter demo us an embeddable query operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/819e175f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/819e175f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/819e175f
Branch: refs/heads/feature-AppData
Commit: 819e175f54ab9869d932ec04b73cfaff451387ac
Parents: 6cff911
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Aug 11 15:23:45 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:32 2015 -0700
----------------------------------------------------------------------
demos/pom.xml | 2 +-
.../twitter/TwitterTopCounterApplication.java | 8 +-
.../src/main/resources/META-INF/properties.xml | 4 +
.../com/datatorrent/lib/appdata/StoreUtils.java | 84 +++++++++++
.../lib/appdata/query/WindowBoundedService.java | 147 +++++++++++++++++++
.../snapshot/AbstractAppDataSnapshotServer.java | 69 +++++++--
.../lib/io/PubSubWebSocketAppDataQuery.java | 82 ++++++++++-
.../query/QueryManagerAsynchronousTest.java | 1 -
.../appdata/query/WindowBoundedServiceTest.java | 124 ++++++++++++++++
pom.xml | 4 +-
10 files changed, 507 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 2e93e02..7976aa0 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -28,7 +28,7 @@
</modules>
<properties>
- <datatorrent.version>3.0.0</datatorrent.version>
+ <datatorrent.version>3.1.0-SNAPSHOT</datatorrent.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
<semver.plugin.skip>true</semver.plugin.skip>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
index db59bdf..508b8a1 100644
--- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
+++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java
@@ -195,7 +195,7 @@ public class TwitterTopCounterApplication implements StreamingApplication
if (!StringUtils.isEmpty(gatewayAddress)) {
URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
- AppDataSnapshotServerMap snapshotServer = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
+ AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
Map<String, String> conversionMap = Maps.newHashMap();
conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE);
@@ -204,15 +204,15 @@ public class TwitterTopCounterApplication implements StreamingApplication
snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
snapshotServer.setTableFieldToMapField(conversionMap);
- PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
+ PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery();
wsQuery.setUri(uri);
- Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+ snapshotServer.setEmbeddableQueryInfoProvider(wsQuery);
+
PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
wsResult.setUri(uri);
Operator.InputPort<String> queryResultPort = wsResult.input;
dag.addStream("MapProvider", topCount, snapshotServer.input);
- dag.addStream("Query", queryPort, snapshotServer.query);
dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/demos/twitter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml b/demos/twitter/src/main/resources/META-INF/properties.xml
index 53b7fb9..e2547cc 100644
--- a/demos/twitter/src/main/resources/META-INF/properties.xml
+++ b/demos/twitter/src/main/resources/META-INF/properties.xml
@@ -90,6 +90,10 @@
<value>TwitterHashtagQueryDemo</value>
</property>
<property>
+ <name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+ <value>TwitterHashtagQueryDemo</value>
+ </property>
+ <property>
<name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name>
<value>TwitterHashtagQueryResultDemo</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
new file mode 100644
index 0000000..11efc02
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/StoreUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.io.SimpleSinglePortInputOperator.BufferingOutputPort;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Sink;
+
+public class StoreUtils
+{
+ /**
+ * This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port
+ * of the encapsulating {@link AppData.Store}.
+ * @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the
+ * {@link AppData.Store}'s input port.
+ * @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
+ * @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}.
+ */
+ public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort)
+ {
+ outputPort.setSink(
+ new Sink<Object>()
+ {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void put(Object tuple)
+ {
+ LOG.debug("processing tuple");
+ inputPort.process((T)tuple);
+ }
+
+ @Override
+ public int getCount(boolean reset)
+ {
+ return 0;
+ }
+
+ }
+ );
+ }
+
+ /**
+ * This is a utility class which is responsible for flushing {@link BufferingOutputPort}s.
+ * @param <TUPLE_TYPE> The type of the tuple emitted by the {@link BufferingOutputPort}.
+ */
+ public static class BufferingOutputPortFlusher<TUPLE_TYPE> implements Runnable
+ {
+ private final BufferingOutputPort<TUPLE_TYPE> port;
+
+ public BufferingOutputPortFlusher(BufferingOutputPort<TUPLE_TYPE> port)
+ {
+ this.port = Preconditions.checkNotNull(port);
+ }
+
+ @Override
+ public void run()
+ {
+ port.flush(Integer.MAX_VALUE);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
new file mode 100644
index 0000000..7b82f4f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This class asynchronously executes a function so that the function is only called between calls
+ * to {@link Operator#beginWindow} and {@link Operator#endWindow}.<br/><br/>
+ * This service works by asynchronously calling its {@link #execute} method only after
+ * {@link #beginWindow} and called and before {@link #endWindow} ends. Calls to {@link #beginWindow}
+ * and {@link endWindow} will happen in the enclosing {@link Operator}'s main thread.
+ * <br/><br/>
+ * <b>Note:</b> This service cannot be used in operators which allow checkpointing within an
+ * application window.
+ */
+public class WindowBoundedService implements Component<OperatorContext>
+{
+ public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10;
+
+ /**
+ * The execute interval period in milliseconds.
+ */
+ private final long executeIntervalMillis;
+ /**
+ * The code to execute asynchronously.
+ */
+ private final Runnable runnable;
+ protected transient ExecutorService executorThread;
+
+ private final transient Semaphore mutex = new Semaphore(0);
+
+ public WindowBoundedService(Runnable runnable)
+ {
+ this.executeIntervalMillis = DEFAULT_FLUSH_INTERVAL_MILLIS;
+ this.runnable = Preconditions.checkNotNull(runnable);
+ }
+
+ public WindowBoundedService(long executeIntervalMillis,
+ Runnable runnable)
+ {
+ Preconditions.checkArgument(executeIntervalMillis > 0,
+ "The executeIntervalMillis must be positive");
+ this.executeIntervalMillis = executeIntervalMillis;
+ this.runnable = Preconditions.checkNotNull(runnable);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread"));
+ executorThread.submit(new AsynchExecutorThread(Thread.currentThread()));
+ }
+
+ public void beginWindow(long windowId)
+ {
+ mutex.release();
+ }
+
+ public void endWindow()
+ {
+ try {
+ mutex.acquire();
+ } catch (InterruptedException ex) {
+ DTThrowable.wrapIfChecked(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ executorThread.shutdownNow();
+ }
+
+ public class AsynchExecutorThread implements Callable<Void>
+ {
+ private final Thread mainThread;
+ private long lastExecuteTime = 0;
+
+ public AsynchExecutorThread(Thread mainThread)
+ {
+ this.mainThread = mainThread;
+ }
+
+ @Override
+ @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"})
+ public Void call() throws Exception
+ {
+ try {
+ loop();
+ } catch (Exception e) {
+ LOG.error("Exception thrown while processing:", e);
+ mutex.release();
+ mainThread.interrupt();
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("SleepWhileInLoop")
+ private void loop() throws Exception
+ {
+ while (true) {
+ long currentTime = System.currentTimeMillis();
+ long diff = currentTime - lastExecuteTime;
+ if (diff > executeIntervalMillis) {
+ lastExecuteTime = currentTime;
+ mutex.acquireUninterruptibly();
+ runnable.run();
+ mutex.release();
+ } else {
+ Thread.sleep(executeIntervalMillis - diff);
+ }
+ }
+ }
+ }
+
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(WindowBoundedService.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index d0241d2..a51908f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -21,6 +21,7 @@ import java.util.List;
import javax.validation.constraints.NotNull;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.mutable.MutableLong;
+import com.datatorrent.lib.appdata.StoreUtils;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
import com.datatorrent.lib.appdata.query.QueryExecutor;
@@ -43,8 +45,10 @@ import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
/**
* This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data
@@ -54,7 +58,7 @@ import com.datatorrent.common.experimental.AppData;
* @param <INPUT_EVENT> The type of the input events that the operator accepts.
* @since 3.0.0
*/
-public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator
+public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator, AppData.Store<String>
{
/**
* The {@link QueryManagerSynchronous} for the operator.
@@ -84,11 +88,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
* The current data to be served by the operator.
*/
private List<GPOMutable> currentData = Lists.newArrayList();
+ private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider;
@AppData.ResultPort
public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>();
@AppData.QueryPort
+ @InputPortFieldAnnotation(optional=true)
public transient final DefaultInputPort<String> query = new DefaultInputPort<String>()
{
@Override
@@ -99,24 +105,22 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
try {
query = queryDeserializerFactory.deserialize(queryJSON);
- }
- catch(IOException ex) {
+ } catch (IOException ex) {
LOG.error("Error parsing query: {}", queryJSON);
LOG.error("{}", ex);
return;
}
- if(query instanceof SchemaQuery) {
- SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery) query);
+ if (query instanceof SchemaQuery) {
+ SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query);
- if(schemaResult != null) {
+ if (schemaResult != null) {
String schemaResultJSON = resultSerializerFactory.serialize(schemaResult);
LOG.debug("emitting {}", schemaResultJSON);
queryResult.emit(schemaResultJSON);
}
- }
- else if(query instanceof DataQuerySnapshot) {
- queryProcessor.enqueue((DataQuerySnapshot) query, null, null);
+ } else if (query instanceof DataQuerySnapshot) {
+ queryProcessor.enqueue((DataQuerySnapshot)query, null, null);
}
}
};
@@ -150,6 +154,13 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
*/
public abstract GPOMutable convert(INPUT_EVENT inputEvent);
+
+ @Override
+ final public void activate(OperatorContext ctx)
+ {
+ embeddableQueryInfoProvider.activate(ctx);
+ }
+
@SuppressWarnings("unchecked")
@Override
public void setup(OperatorContext context)
@@ -164,17 +175,33 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
resultSerializerFactory = new MessageSerializerFactory(resultFormatter);
queryProcessor.setup(context);
+
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.enableEmbeddedMode();
+ LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName());
+ StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
+ query);
+ embeddableQueryInfoProvider.setup(context);
+ }
}
@Override
public void beginWindow(long windowId)
{
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.beginWindow(windowId);
+ }
+
queryProcessor.beginWindow(windowId);
}
@Override
public void endWindow()
{
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.endWindow();
+ }
+
{
Result result = null;
@@ -191,9 +218,21 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
@Override
public void teardown()
{
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.teardown();
+ }
+
queryProcessor.teardown();
}
+ @Override
+ public void deactivate()
+ {
+ if (embeddableQueryInfoProvider != null) {
+ embeddableQueryInfoProvider.deactivate();
+ }
+ }
+
/**
* Gets the JSON for the schema.
* @return the JSON for the schema.
@@ -230,6 +269,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
this.resultFormatter = resultFormatter;
}
+ @Override
+ public EmbeddableQueryInfoProvider<String> getEmbeddableQueryInfoProvider()
+ {
+ return embeddableQueryInfoProvider;
+ }
+
+ @Override
+ public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider)
+ {
+ this.embeddableQueryInfoProvider = Preconditions.checkNotNull(embeddableQueryInfoProvider);
+ }
+
/**
* The {@link QueryExecutor} which returns the results for queries.
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 14a2d2b..031befd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,16 +19,23 @@ package com.datatorrent.lib.io;
import java.net.URI;
import java.net.URISyntaxException;
+import javax.validation.constraints.Min;
+
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.appdata.StoreUtils.BufferingOutputPortFlusher;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
import com.datatorrent.common.util.PubSubMessage;
/**
@@ -40,11 +47,18 @@ import com.datatorrent.common.util.PubSubMessage;
* @tags input, app data, query
* @since 3.0.0
*/
-public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider
+public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider, EmbeddableQueryInfoProvider<String>
{
private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class);
private static final long serialVersionUID = 201506121124L;
+ public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10;
+
+ private boolean useEmitThread;
+ @Min(0)
+ private long executeIntervalMillis = DEFAULT_EXECUTE_INTERVAL_MILLIS;
+
+ private transient WindowBoundedService windowBoundedService;
public PubSubWebSocketAppDataQuery()
{
@@ -57,6 +71,42 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
this.uri = uriHelper(context, uri);
logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
super.setup(context);
+
+ if (useEmitThread) {
+ windowBoundedService = new WindowBoundedService(executeIntervalMillis,
+ new BufferingOutputPortFlusher<>(this.outputPort));
+ windowBoundedService.setup(context);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+
+ if (windowBoundedService != null) {
+ windowBoundedService.beginWindow(windowId);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (windowBoundedService != null) {
+ windowBoundedService.endWindow();
+ }
+
+ super.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ if (windowBoundedService != null) {
+ windowBoundedService.teardown();
+ }
+
+ super.teardown();
}
public static URI uriHelper(OperatorContext context, URI uri)
@@ -139,4 +189,34 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
{
return "pubsub";
}
+
+ @Override
+ public DefaultOutputPort<String> getOutputPort()
+ {
+ return outputPort;
+ }
+
+ @Override
+ public void enableEmbeddedMode()
+ {
+ useEmitThread = true;
+ }
+
+ /**
+ * Get the number of milliseconds between calls to execute.
+ * @return The number of milliseconds between calls to execute.
+ */
+ public long getExecuteIntervalMillis()
+ {
+ return executeIntervalMillis;
+ }
+
+ /**
+ * The number of milliseconds between calls to execute.
+ * @param executeIntervalMillis The number of milliseconds between calls to execute.
+ */
+ public void setExecuteIntervalMillis(long executeIntervalMillis)
+ {
+ this.executeIntervalMillis = executeIntervalMillis;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
index ef5f0f5..277bb3a 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.java
@@ -54,7 +54,6 @@ public class QueryManagerAsynchronousTest
Thread.sleep(200);
}
catch(InterruptedException ex) {
- throw new RuntimeException(ex);
}
Thread.interrupted();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
new file mode 100644
index 0000000..3674ce5
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.appdata.query;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+
+import com.datatorrent.lib.appdata.query.QueryManagerAsynchronousTest.InterruptClear;
+
+public class WindowBoundedServiceTest
+{
+ @Rule
+ public TestWatcher testMeta = new InterruptClear();
+
+ @Test
+ public void simpleLoopTest() throws Exception
+ {
+ CounterRunnable counterRunnable = new CounterRunnable();
+
+ WindowBoundedService wbs = new WindowBoundedService(1,
+ counterRunnable);
+ wbs.setup(null);
+ Thread.sleep(500);
+ Assert.assertEquals(0, counterRunnable.getCounter());
+ wbs.beginWindow(0);
+ Thread.sleep(500);
+ wbs.endWindow();
+ int currentCount = counterRunnable.getCounter();
+ Thread.sleep(500);
+ wbs.teardown();
+ Assert.assertEquals(currentCount, counterRunnable.getCounter());
+ }
+
+ @Test
+ public void runTest() throws Exception
+ {
+ CounterRunnable counterRunnable = new CounterRunnable();
+
+ WindowBoundedService wbs = new WindowBoundedService(1,
+ counterRunnable);
+ wbs.setup(null);
+ wbs.beginWindow(0);
+ Thread.sleep(500);
+ wbs.endWindow();
+ wbs.teardown();
+ Assert.assertTrue(counterRunnable.getCounter() > 0);
+ }
+
+ @Test
+ public void exceptionTest() throws Exception
+ {
+ WindowBoundedService wbs = new WindowBoundedService(1,
+ new ExceptionRunnable());
+
+ wbs.setup(null);
+ wbs.beginWindow(0);
+
+ boolean caughtException = false;
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ caughtException = true;
+ }
+
+ try {
+ wbs.endWindow();
+ } catch(Exception e) {
+ caughtException = true;
+ }
+
+ wbs.teardown();
+ Assert.assertEquals(true, caughtException);
+ }
+
+ public static class CounterRunnable implements Runnable
+ {
+ private int counter = 0;
+
+ public CounterRunnable()
+ {
+ }
+
+ @Override
+ public void run()
+ {
+ counter++;
+ }
+
+ public int getCounter()
+ {
+ return counter;
+ }
+ }
+
+ public static class ExceptionRunnable implements Runnable
+ {
+ public ExceptionRunnable()
+ {
+ }
+
+ @Override
+ public void run()
+ {
+ throw new RuntimeException("Simulate Failure");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/819e175f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c95f524..e9dbcd6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>com.datatorrent</groupId>
<artifactId>dt-framework</artifactId>
- <version>3.0.0</version>
+ <version>3.1.0-SNAPSHOT</version>
</parent>
<artifactId>malhar</artifactId>
@@ -38,7 +38,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netbeans.hint.license>malhar-inc</netbeans.hint.license>
<maven.deploy.skip>false</maven.deploy.skip>
- <dt.framework.version>3.0.0</dt.framework.version>
+ <dt.framework.version>3.1.0-SNAPSHOT</dt.framework.version>
<!-- the following properties match the properties defined in core/pom.xml -->
<jackson.version>1.9.2</jackson.version>
<jersey.version>1.9</jersey.version>
[21/25] incubator-apex-malhar git commit: Improved robustness of
query checking and validation for snapshot schema.
Posted by da...@apache.org.
Improved robustness of query checking and validation for snapshot schema.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6cff9117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6cff9117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6cff9117
Branch: refs/heads/feature-AppData
Commit: 6cff911728b6956e1e4c246aed3fb315707f4032
Parents: 6155673
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Aug 5 14:36:21 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 18:09:31 2015 -0700
----------------------------------------------------------------------
.../serde/DataQuerySnapshotDeserializer.java | 65 ++++++++--
.../query/serde/DataQuerySnapshotValidator.java | 5 +
.../lib/appdata/schemas/DataQuerySnapshot.java | 6 +-
.../lib/appdata/schemas/SchemaUtils.java | 70 ++++++-----
.../DataQuerySnapshotDeserializerTest.java | 123 ++++++++++++++++---
.../resources/snapshotquery_deserialize1.json | 7 ++
.../resources/snapshotquery_deserialize2.json | 12 ++
.../resources/snapshotquery_deserialize3.json | 4 +
.../resources/snapshotquery_deserialize4.json | 13 ++
.../resources/snapshotquery_deserialize5.json | 13 ++
.../resources/snapshotquery_deserialize6.json | 14 +++
.../resources/snapshotquery_deserialize7.json | 15 +++
.../snapshotquery_invalidcountdown.json | 13 ++
.../resources/snapshotquery_validation1.json | 3 +
.../resources/snapshotquery_validation2.json | 3 +
.../resources/snapshotquery_validation3.json | 6 +
16 files changed, 311 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
index bdc761f..9cc080a 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotDeserializer.java
@@ -30,8 +30,6 @@ import org.slf4j.LoggerFactory;
import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
import com.datatorrent.lib.appdata.schemas.Fields;
import com.datatorrent.lib.appdata.schemas.Message;
-import com.datatorrent.lib.appdata.schemas.QRBase;
-import com.datatorrent.lib.appdata.schemas.Query;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
/**
@@ -40,6 +38,41 @@ import com.datatorrent.lib.appdata.schemas.SchemaUtils;
*/
public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
{
+ public static final Set<Fields> FIRST_LEVEL_FIELD_COMBINATIONS;
+ public static final Set<Fields> DATA_FIELD_COMBINATIONS;
+
+ static {
+ Set<Fields> firstLevelFieldCombinations = Sets.newHashSet();
+ firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+ DataQuerySnapshot.FIELD_TYPE,
+ DataQuerySnapshot.FIELD_COUNTDOWN,
+ DataQuerySnapshot.FIELD_DATA,
+ DataQuerySnapshot.FIELD_INCOMPLETE_RESULTS_OK)));
+ firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+ DataQuerySnapshot.FIELD_TYPE,
+ DataQuerySnapshot.FIELD_DATA,
+ DataQuerySnapshot.FIELD_INCOMPLETE_RESULTS_OK)));
+ firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+ DataQuerySnapshot.FIELD_TYPE,
+ DataQuerySnapshot.FIELD_COUNTDOWN,
+ DataQuerySnapshot.FIELD_DATA)));
+ firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+ DataQuerySnapshot.FIELD_TYPE,
+ DataQuerySnapshot.FIELD_DATA)));
+ firstLevelFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_ID,
+ DataQuerySnapshot.FIELD_TYPE)));
+
+ FIRST_LEVEL_FIELD_COMBINATIONS = firstLevelFieldCombinations;
+
+ Set<Fields> dataFieldCombinations = Sets.newHashSet();
+ dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_SCHEMA_KEYS,
+ DataQuerySnapshot.FIELD_FIELDS)));
+ dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_SCHEMA_KEYS)));
+ dataFieldCombinations.add(new Fields(Sets.newHashSet(DataQuerySnapshot.FIELD_FIELDS)));
+
+ DATA_FIELD_COMBINATIONS = dataFieldCombinations;
+ }
+
/**
* Constructor used to instantiate deserializer in {@link MessageDeserializerFactory}.
*/
@@ -57,7 +90,11 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
context);
}
catch(Exception ex) {
- throw new IOException(ex);
+ if (ex instanceof IOException) {
+ throw (IOException) ex;
+ } else {
+ throw new IOException(ex);
+ }
}
}
@@ -74,9 +111,14 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
{
JSONObject jo = new JSONObject(json);
+ //Validate fields
+ if (!SchemaUtils.checkValidKeys(jo, FIRST_LEVEL_FIELD_COMBINATIONS)) {
+ throw new IOException("Invalid keys");
+ }
+
//// Query id stuff
- String id = jo.getString(QRBase.FIELD_ID);
- String type = jo.getString(Message.FIELD_TYPE);
+ String id = jo.getString(DataQuerySnapshot.FIELD_ID);
+ String type = jo.getString(DataQuerySnapshot.FIELD_TYPE);
if(!type.equals(DataQuerySnapshot.TYPE)) {
LOG.error("Found type {} in the query json, but expected type {}.", type, DataQuerySnapshot.TYPE);
@@ -85,10 +127,10 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
/// Countdown
long countdown = -1L;
- boolean hasCountdown = jo.has(QRBase.FIELD_COUNTDOWN);
+ boolean hasCountdown = jo.has(DataQuerySnapshot.FIELD_COUNTDOWN);
if(hasCountdown) {
- countdown = jo.getLong(QRBase.FIELD_COUNTDOWN);
+ countdown = jo.getLong(DataQuerySnapshot.FIELD_COUNTDOWN);
}
////Data
@@ -98,8 +140,13 @@ public class DataQuerySnapshotDeserializer implements CustomMessageDeserializer
if(jo.has(DataQuerySnapshot.FIELD_DATA)) {
JSONObject data = jo.getJSONObject(DataQuerySnapshot.FIELD_DATA);
- if(data.has(Query.FIELD_SCHEMA_KEYS)) {
- schemaKeys = SchemaUtils.extractMap(data.getJSONObject(Query.FIELD_SCHEMA_KEYS));
+ if (!SchemaUtils.checkValidKeys(data, DATA_FIELD_COMBINATIONS)) {
+ LOG.error("Error validating {} field", DataQuerySnapshot.FIELD_DATA);
+ throw new IOException("Invalid keys");
+ }
+
+ if (data.has(DataQuerySnapshot.FIELD_SCHEMA_KEYS)) {
+ schemaKeys = SchemaUtils.extractMap(data.getJSONObject(DataQuerySnapshot.FIELD_SCHEMA_KEYS));
}
if(data.has(DataQuerySnapshot.FIELD_FIELDS)) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
index d13dcc4..d2b7ef0 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/serde/DataQuerySnapshotValidator.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot;
+import com.datatorrent.lib.appdata.schemas.Fields;
import com.datatorrent.lib.appdata.schemas.Message;
import com.datatorrent.lib.appdata.schemas.SchemaRegistry;
import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
@@ -53,6 +54,10 @@ public class DataQuerySnapshotValidator implements CustomMessageValidator
return false;
}
+ if (gdqt.getFields().getFields().isEmpty()) {
+ gdqt.setFields(new Fields(fields));
+ }
+
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
index 0943172..5714597 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshot.java
@@ -51,6 +51,10 @@ public class DataQuerySnapshot extends Query
* The JSON string for the schemaKeys in the query.
*/
public static final String SCHEMA_KEYS = "schemaKeys";
+ /**
+ * The JSON string for the incompleteResultOK field in the query.
+ */
+ public static final String FIELD_INCOMPLETE_RESULTS_OK = "incompleteResultOK";
/**
* The fields requested to be returned in the query.
@@ -132,7 +136,7 @@ public class DataQuerySnapshot extends Query
* Sets the fields of the query.
* @param fields The fields of the query.
*/
- private void setFields(Fields fields)
+ public final void setFields(Fields fields)
{
Preconditions.checkNotNull(fields);
this.fields = fields;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
index dbd276c..ce68c7f 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SchemaUtils.java
@@ -19,8 +19,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -100,18 +100,10 @@ public class SchemaUtils
Fields fields)
{
@SuppressWarnings("unchecked")
- Iterator<String> keyIterator = jo.keys();
Set<String> fieldSet = fields.getFields();
+ Set<String> jsonKeys = getSetOfJSONKeys(jo);
- while(keyIterator.hasNext()) {
- String key = keyIterator.next();
-
- if(!fieldSet.contains(key)) {
- return false;
- }
- }
-
- return true;
+ return jsonKeys.containsAll(fieldSet);
}
/**
@@ -124,34 +116,38 @@ public class SchemaUtils
Fields fields)
{
@SuppressWarnings("unchecked")
- Iterator<String> keyIterator = jo.keys();
Set<String> fieldSet = fields.getFields();
+ Set<String> jsonKeys = getSetOfJSONKeys(jo);
- while(keyIterator.hasNext()) {
- String key = keyIterator.next();
+ if (!jsonKeys.containsAll(fieldSet)) {
- if(!fieldSet.contains(key)) {
- throw new IllegalArgumentException("The key " + key +
- " is not valid.");
- }
+ throw new IllegalArgumentException("The given set of keys "
+ + fieldSet
+ + " doesn't equal the set of keys in the json "
+ + jsonKeys);
}
}
/**
* This is a utility method to check that the given JSONObject has the given keys.
* @param jo The {@link JSONObject} to check.
- * @param fieldsList The keys in the {@link JSONObject} to check.
+ * @param fieldsCollection The keys in the {@link JSONObject} to check.
* @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
*/
public static boolean checkValidKeys(JSONObject jo,
- List<Fields> fieldsList)
+ Collection<Fields> fieldsCollection)
{
- for(Fields fields: fieldsList) {
- if(checkValidKeys(jo, fields)) {
+ for (Fields fields: fieldsCollection) {
+ LOG.debug("Checking keys: {}", fields);
+ if (checkValidKeys(jo, fields)) {
return true;
}
}
+ LOG.error("The first level of keys in the provided JSON {} do not match any of the " +
+ "valid keysets {}",
+ getSetOfJSONKeys(jo),
+ fieldsCollection);
return false;
}
@@ -159,31 +155,37 @@ public class SchemaUtils
* This is a utility method to check that the given JSONObject has the given keys.
* It throws an {@link IllegalArgumentException} if it doesn't contain all the given keys.
* @param jo The {@link JSONObject} to check.
- * @param fieldsList The keys in the {@link JSONObject} to check.
+ * @param fieldsCollection The keys in the {@link JSONObject} to check.
* @return True if the given {@link JSONObject} contains all the given keys. False otherwise.
*/
public static boolean checkValidKeysEx(JSONObject jo,
- List<Fields> fieldsList)
+ Collection<Fields> fieldsCollection)
{
- for(Fields fields: fieldsList) {
- if(checkValidKeys(jo, fields)) {
+ for (Fields fields: fieldsCollection) {
+ if (checkValidKeys(jo, fields)) {
return true;
}
}
- Set<String> keys = Sets.newHashSet();
+ Set<String> keys = getSetOfJSONKeys(jo);
+
+ throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
+ keys +
+ "\nOne of the following key combinations was expected:\n" +
+ fieldsCollection);
+ }
+
+ public static Set<String> getSetOfJSONKeys(JSONObject jo)
+ {
@SuppressWarnings("unchecked")
Iterator<String> keyIterator = jo.keys();
+ Set<String> keySet = Sets.newHashSet();
- while(keyIterator.hasNext()) {
- String key = keyIterator.next();
- keys.add(key);
+ while (keyIterator.hasNext()) {
+ keySet.add(keyIterator.next());
}
- throw new IllegalArgumentException("The given json object has an invalid set of keys: " +
- keys +
- "\nOne of the following key combinations was expected:\n" +
- fieldsList);
+ return keySet;
}
public static Map<String, String> convertFieldToType(Map<String, Type> fieldToType)
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
index a79cd63..f267571 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DataQuerySnapshotDeserializerTest.java
@@ -15,30 +15,48 @@
*/
package com.datatorrent.lib.appdata.schemas;
+import java.io.IOException;
+
import java.util.Map;
+import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestWatcher;
import com.datatorrent.lib.appdata.query.serde.DataQuerySnapshotDeserializer;
+import com.datatorrent.lib.appdata.query.serde.MessageDeserializerFactory;
public class DataQuerySnapshotDeserializerTest
{
+ @Rule
+ public DataQuerySnapshotInfo testMeta = new DataQuerySnapshotInfo();
+
+ public static class DataQuerySnapshotInfo extends TestWatcher
+ {
+ public MessageDeserializerFactory queryDeserializerFactory;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ String snapshotSchemaJSON = SchemaUtils.jarResourceFileToString("snapshotschema.json");
+ SchemaRegistrySingle schemaRegistry = new SchemaRegistrySingle(new SnapshotSchema(snapshotSchemaJSON));
+ queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class,
+ DataQuerySnapshot.class);
+ queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry);
+ }
+ }
+
@Test
public void simpleDeserializerTest() throws Exception
{
DataQuerySnapshotDeserializer deserializer = new DataQuerySnapshotDeserializer();
- String queryJSON = "{\n"
- + " \"id\": \"1\",\n"
- + " \"type\": \"dataQuery\",\n"
- + " \"data\": {\n"
- + " \"fields\": [ \"url\", \"count\" ]\n"
- + " }\n"
- + "}";
+ String queryJSON = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize1.json");
DataQuerySnapshot gQuery = (DataQuerySnapshot) deserializer.deserialize(queryJSON, DataQuerySnapshot.class, null);
@@ -50,7 +68,6 @@ public class DataQuerySnapshotDeserializerTest
Assert.assertEquals("The fields must equal.", fields, gQuery.getFields());
}
-
@Test
public void simpleDeserializerWithSchemaKeysTest() throws Exception
{
@@ -61,15 +78,7 @@ public class DataQuerySnapshotDeserializerTest
DataQuerySnapshotDeserializer deserializer = new DataQuerySnapshotDeserializer();
- String queryJSON = "{\n"
- + " \"id\": \"1\",\n"
- + " \"type\": \"dataQuery\",\n"
- + " \"data\": {\n"
- + " \"schemaKeys\":"
- + " {\"publisher\":\"google\",\"advertiser\":\"microsoft\",\"location\":\"CA\"},"
- + " \"fields\": [ \"url\", \"count\" ]\n"
- + " }\n"
- + "}";
+ String queryJSON = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize2.json");
DataQuerySnapshot gQuery = (DataQuerySnapshot) deserializer.deserialize(queryJSON, DataQuerySnapshot.class, null);
@@ -81,4 +90,84 @@ public class DataQuerySnapshotDeserializerTest
Assert.assertEquals("The fields must equal.", fields, gQuery.getFields());
Assert.assertEquals(expectedSchemaKeys, gQuery.getSchemaKeys());
}
+
+ @Test
+ public void noFieldsSpecified() throws Exception
+ {
+ String snapshotQuery = SchemaUtils.jarResourceFileToString("snapshotquery_deserialize3.json");
+ DataQuerySnapshot query = (DataQuerySnapshot) testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+
+ Set<String> expectedFields = Sets.newHashSet("boolField", "intField", "doubleField");
+
+ Assert.assertEquals(expectedFields, query.getFields().getFields());
+ }
+
+ @Test
+ public void validDeserialize1Test() throws Exception
+ {
+ testValid("snapshotquery_deserialize4.json");
+ }
+
+ @Test
+ public void validDeserialize2Test() throws Exception
+ {
+ testValid("snapshotquery_deserialize5.json");
+ }
+
+ @Test
+ public void validDeserialize3Test() throws Exception
+ {
+ testValid("snapshotquery_deserialize6.json");
+ }
+
+ @Test
+ public void validDeserializeExtraFieldTest() throws Exception
+ {
+ testValid("snapshotquery_deserialize7.json");
+ }
+
+ @Test
+ public void invalidTestCountdownValue() throws Exception
+ {
+ testInvalid("snapshotquery_invalidcountdown.json");
+ }
+
+ @Test
+ public void invalidTest1() throws Exception
+ {
+ testInvalid("snapshotquery_validation1.json");
+ }
+
+ @Test
+ public void invalidTest2() throws Exception
+ {
+ testInvalid("snapshotquery_validation2.json");
+ }
+
+ @Test
+ public void invalidTest3() throws Exception
+ {
+ testInvalid("snapshotquery_validation3.json");
+ }
+
+ private void testInvalid(String invalidResourceJSON) throws Exception
+ {
+ boolean caughtException = false;
+
+ try {
+ String snapshotQuery = SchemaUtils.jarResourceFileToString(invalidResourceJSON);
+ testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+ } catch (IOException e) {
+ caughtException = true;
+ }
+
+ Assert.assertTrue(caughtException);
+ }
+
+ private void testValid(String validResourceJSON) throws Exception
+ {
+ String snapshotQuery = SchemaUtils.jarResourceFileToString(validResourceJSON);
+ DataQuerySnapshot query = (DataQuerySnapshot) testMeta.queryDeserializerFactory.deserialize(snapshotQuery);
+ Assert.assertNotNull(query);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize1.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize1.json b/library/src/test/resources/snapshotquery_deserialize1.json
new file mode 100644
index 0000000..13c80ac
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize1.json
@@ -0,0 +1,7 @@
+{
+"id": 1,
+"type": "dataQuery",
+"data": {
+ "fields": ["url", "count"]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize2.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize2.json b/library/src/test/resources/snapshotquery_deserialize2.json
new file mode 100644
index 0000000..8ebc168
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize2.json
@@ -0,0 +1,12 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["url", "count"]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize3.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize3.json b/library/src/test/resources/snapshotquery_deserialize3.json
new file mode 100644
index 0000000..2d706af
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize3.json
@@ -0,0 +1,4 @@
+{
+"id": 1,
+"type": "dataQuery"
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize4.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize4.json b/library/src/test/resources/snapshotquery_deserialize4.json
new file mode 100644
index 0000000..3a150cf
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize4.json
@@ -0,0 +1,13 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["boolField", "doubleField"]
+ },
+ "countdown":10
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize5.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize5.json b/library/src/test/resources/snapshotquery_deserialize5.json
new file mode 100644
index 0000000..e4907ad
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize5.json
@@ -0,0 +1,13 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["boolField", "doubleField"]
+ },
+ "incompleteResultOK":true
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize6.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize6.json b/library/src/test/resources/snapshotquery_deserialize6.json
new file mode 100644
index 0000000..abeddf4
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize6.json
@@ -0,0 +1,14 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["boolField", "doubleField"]
+ },
+ "incompleteResultOK":true,
+ "countdown":10
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_deserialize7.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_deserialize7.json b/library/src/test/resources/snapshotquery_deserialize7.json
new file mode 100644
index 0000000..3347b3c
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_deserialize7.json
@@ -0,0 +1,15 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["boolField", "doubleField"]
+ },
+ "incompleteResultOK":true,
+ "countdown":10,
+ "blahblahblahinvalid":"a"
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_invalidcountdown.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_invalidcountdown.json b/library/src/test/resources/snapshotquery_invalidcountdown.json
new file mode 100644
index 0000000..8551c83
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_invalidcountdown.json
@@ -0,0 +1,13 @@
+{
+ "id":1,
+ "type": "dataQuery",
+ "countdown":-10,
+ "data": {
+ "schemaKeys": {
+ "publisher":"google",
+ "advertiser":"microsoft",
+ "location":"CA"
+ },
+ "fields": ["url", "count"]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation1.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation1.json b/library/src/test/resources/snapshotquery_validation1.json
new file mode 100644
index 0000000..2572ae5
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation1.json
@@ -0,0 +1,3 @@
+{
+ "id": 1
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation2.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation2.json b/library/src/test/resources/snapshotquery_validation2.json
new file mode 100644
index 0000000..40fe011
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation2.json
@@ -0,0 +1,3 @@
+{
+ "type": "dataQuery"
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6cff9117/library/src/test/resources/snapshotquery_validation3.json
----------------------------------------------------------------------
diff --git a/library/src/test/resources/snapshotquery_validation3.json b/library/src/test/resources/snapshotquery_validation3.json
new file mode 100644
index 0000000..e5aaad0
--- /dev/null
+++ b/library/src/test/resources/snapshotquery_validation3.json
@@ -0,0 +1,6 @@
+{
+ "id": 1,
+ "type": "dataQuery",
+ "data": {
+ }
+}