You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/09/22 17:47:53 UTC
[1/3] git commit: Merge pull request #4 from apache/master
Repository: incubator-streams
Updated Branches:
refs/heads/master dc5e7de3c -> 2397f4b48
Merge pull request #4 from apache/master
Merge Apache Master
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/507e6796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/507e6796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/507e6796
Branch: refs/heads/master
Commit: 507e67962e97e5e6b4c1de0bc0789c25804cd6af
Parents: dca6475 dc5e7de
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Fri Sep 19 11:03:37 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Fri Sep 19 11:03:37 2014 -0500
----------------------------------------------------------------------
README.txt | 2 +-
pom.xml | 2 +-
streams-contrib/pom.xml | 1 +
.../streams/console/ConsolePersistWriter.java | 5 +-
.../ElasticsearchConfigurator.java | 41 +-
.../ElasticsearchPersistDeleter.java | 34 +-
.../ElasticsearchPersistUpdater.java | 66 +-
.../ElasticsearchPersistWriter.java | 65 +-
.../ElasticsearchWriterConfiguration.json | 6 +-
.../streams-processor-jackson/pom.xml | 86 ++
.../CleanAdditionalPropertiesProcessor.java | 62 +
.../regex/AbstractRegexExtensionExtractor.java | 13 +-
.../org/apache/streams/urls/LinkResolver.java | 6 +
.../streams-provider-datasift/pom.xml | 21 +-
.../streams/datasift/csdl/DatasiftCsdlUtil.java | 6 +-
.../DatasiftTypeConverterProcessor.java | 154 +++
.../provider/DatasiftManagedSourceSetup.java | 76 ++
.../provider/DatasiftStreamConfigurator.java | 15 +-
.../DatasiftTypeConverterProcessor.java | 167 ---
.../serializer/DatasiftActivitySerializer.java | 5 +-
.../DatasiftDefaultActivitySerializer.java | 13 +-
.../DatasiftInstagramActivitySerializer.java | 118 ++
.../DatasiftTweetActivitySerializer.java | 6 +
.../main/jsonschema/com/datasift/Datasift.json | 1103 ------------------
.../com/datasift/DatasiftConfiguration.json | 22 -
.../com/datasift/DatasiftPushConfiguration.json | 17 -
.../datasift/DatasiftStreamConfiguration.json | 17 -
.../com/datasift/DatasiftTwitterUser.json | 68 --
.../org/apache/streams/datasift/Datasift.json | 458 ++++++++
.../streams/datasift/DatasiftConfiguration.json | 133 +++
.../datasift/DatasiftPushConfiguration.json | 17 +
.../datasift/DatasiftStreamConfiguration.json | 17 +
.../datasift/facebook/DatasiftFacebook.json | 120 ++
.../datasift/instagram/DatasiftInstagram.json | 178 +++
.../interaction/DatasiftInteraction.json | 92 ++
.../datasift/twitter/DatasiftTwitter.json | 365 ++++++
.../datasift/twitter/DatasiftTwitterMedia.json | 126 ++
.../datasift/twitter/DatasiftTwitterUser.json | 68 ++
.../DatasiftTypeConverterProcessorTest.java | 5 +-
.../DatasiftActivitySerializerTest.java | 19 +-
.../test/resources/instagram_datasift_json.txt | 25 +
.../metadata/facebook_post.mup | 694 +++++++++++
.../metadata/facebook_post.png | Bin 0 -> 1038039 bytes
.../api/FacebookPostActivitySerializer.java | 29 +-
.../processor/FacebookTypeConverter.java | 12 +-
.../pagefeed/FacebookDataCollector.java | 138 +++
.../pagefeed/FacebookPageFeedDataCollector.java | 124 ++
.../pagefeed/FacebookPageFeedProvider.java | 28 +
.../provider/pagefeed/FacebookProvider.java | 139 +++
.../FacebookStreamsPostSerializer.java | 60 +
.../streams/facebook/FacebookConfiguration.json | 33 +
.../provider/pagefeed/TestFacebookProvider.java | 94 ++
.../processor/InstagramTypeConverter.java | 19 +-
.../provider/InstagramAbstractProvider.java | 196 ++++
.../provider/InstagramDataCollector.java | 145 +++
.../provider/InstagramRecentMediaCollector.java | 171 ---
.../provider/InstagramRecentMediaProvider.java | 201 ----
.../InstagramRecentMediaCollector.java | 111 ++
.../InstagramRecentMediaProvider.java | 60 +
.../userinfo/InstagramUserInfoCollector.java | 92 ++
.../userinfo/InstagramUserInfoProvider.java | 39 +
.../serializer/InstagramUserInfoSerializer.java | 83 ++
.../InstagramRecentMediaCollectorTest.java | 159 ---
.../InstagramRecentMediaProviderTest.java | 175 ---
.../InstagramRecentMediaCollectorTest.java | 156 +++
.../InstagramRecentMediaProviderTest.java | 174 +++
.../InstagramUserInfoCollectorTest.java | 120 ++
.../apache/streams/data/util/MoreoverUtils.java | 2 +-
.../SysomosBeatActivityConverter.java | 7 +-
.../FetchAndReplaceTwitterProcessor.java | 4 +-
.../twitter/provider/TwitterConfigurator.java | 70 ++
.../provider/TwitterStreamConfigurator.java | 99 --
.../twitter/provider/TwitterStreamProvider.java | 2 +-
.../provider/TwitterTimelineProvider.java | 8 +-
.../TwitterUserInformationProvider.java | 4 +-
.../apache/streams/core/util/DatumUtils.java | 49 +
streams-pojo/metadata/Serializer.mup | 371 ++++++
streams-pojo/metadata/serializer.png | Bin 0 -> 604337 bytes
.../org/apache/streams/data/util/JsonUtil.java | 8 +-
.../jackson/StreamsDateTimeSerializer.java | 4 -
.../streams/jackson/StreamsJacksonMapper.java | 10 -
.../jackson/StreamsPeriodDeserializer.java | 1 -
.../jackson/StreamsPeriodSerializer.java | 2 -
.../org/apache/streams/pojo/json/activity.json | 3 +-
.../data/data/util/DateTimeSerDeTest.java | 2 -
streams-runtimes/streams-runtime-local/pom.xml | 6 +
.../local/tasks/StreamsPersistWriterTask.java | 2 +
.../local/tasks/StreamsProcessorTask.java | 6 +-
.../local/tasks/StreamsProviderTask.java | 19 +-
.../local/builders/LocalStreamBuilderTest.java | 37 +-
.../local/builders/ToyLocalBuilderExample.java | 6 +-
.../streams/local/tasks/BasicTasksTest.java | 6 +-
.../local/tasks/StreamsProviderTaskTest.java | 147 +++
.../test/processors/DoNothingProcessor.java | 2 +-
.../PassthroughDatumCounterProcessor.java | 2 +-
.../test/providers/NumericMessageProvider.java | 2 +-
.../local/test/writer/DatumCounterWriter.java | 2 +-
.../local/test/writer/DoNothingWriter.java | 2 +-
.../local/test/writer/SystemOutWriter.java | 2 +-
.../apache/streams/util/SerializationUtil.java | 4 +-
100 files changed, 5544 insertions(+), 2419 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: STREAMS-179 | Added new thread pool executor to
shutdown streams
Posted by mf...@apache.org.
STREAMS-179 | Added new thread pool executor to shutdown streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc
Branch: refs/heads/master
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 3 +-
...amOnUnhandleThrowableThreadPoolExecutor.java | 45 ++++++++
...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
import org.apache.log4j.spi.LoggerFactory;
import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
import org.apache.streams.local.tasks.StatusCounterMonitorThread;
import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
public void start() {
attachShutdownHandler();
boolean isRunning = true;
- this.executor = Executors.newFixedThreadPool(this.totalTasks);
+ this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
tasks = new HashMap<String, List<StreamsTask>>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+ private LocalStreamBuilder streamBuilder;
+ private volatile boolean isStoped;
+
+ /**
+ * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+ * @param numThreads number of threads in pool
+ * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+ */
+ public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+ super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ this.streamBuilder = streamBuilder;
+ this.isStoped = false;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if(t != null) {
+ LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+ LOGGER.error("Attempting to shut down stream.");
+ synchronized (this) {
+ if (!this.isStoped) {
+ this.isStoped = true;
+ this.streamBuilder.stop();
+ }
+ }
+ } else {
+ LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+ @Test
+ public void testShutDownOnException() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ throw new RuntimeException("Testing Throwable Handling!");
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+ @Test
+ public void testNormalExecution() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+}
[3/3] git commit: STREAMS-179 | Added javdoc comments.
Posted by mf...@apache.org.
STREAMS-179 | Added javdoc comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2397f4b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2397f4b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2397f4b4
Branch: refs/heads/master
Commit: 2397f4b480448ebd3e1a3c4efa4d9dc5ef64750a
Parents: e091c6c
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:24:22 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:24:22 2014 -0500
----------------------------------------------------------------------
.../ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2397f4b4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index f8d6343..55a26e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
+ * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable.
* @see {@link java.util.concurrent.ThreadPoolExecutor}
*/
public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {