You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/09/22 17:47:53 UTC

[1/3] git commit: Merge pull request #4 from apache/master

Repository: incubator-streams
Updated Branches:
  refs/heads/master dc5e7de3c -> 2397f4b48


Merge pull request #4 from apache/master

Merge Apache Master

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/507e6796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/507e6796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/507e6796

Branch: refs/heads/master
Commit: 507e67962e97e5e6b4c1de0bc0789c25804cd6af
Parents: dca6475 dc5e7de
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Fri Sep 19 11:03:37 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Fri Sep 19 11:03:37 2014 -0500

----------------------------------------------------------------------
 README.txt                                      |    2 +-
 pom.xml                                         |    2 +-
 streams-contrib/pom.xml                         |    1 +
 .../streams/console/ConsolePersistWriter.java   |    5 +-
 .../ElasticsearchConfigurator.java              |   41 +-
 .../ElasticsearchPersistDeleter.java            |   34 +-
 .../ElasticsearchPersistUpdater.java            |   66 +-
 .../ElasticsearchPersistWriter.java             |   65 +-
 .../ElasticsearchWriterConfiguration.json       |    6 +-
 .../streams-processor-jackson/pom.xml           |   86 ++
 .../CleanAdditionalPropertiesProcessor.java     |   62 +
 .../regex/AbstractRegexExtensionExtractor.java  |   13 +-
 .../org/apache/streams/urls/LinkResolver.java   |    6 +
 .../streams-provider-datasift/pom.xml           |   21 +-
 .../streams/datasift/csdl/DatasiftCsdlUtil.java |    6 +-
 .../DatasiftTypeConverterProcessor.java         |  154 +++
 .../provider/DatasiftManagedSourceSetup.java    |   76 ++
 .../provider/DatasiftStreamConfigurator.java    |   15 +-
 .../DatasiftTypeConverterProcessor.java         |  167 ---
 .../serializer/DatasiftActivitySerializer.java  |    5 +-
 .../DatasiftDefaultActivitySerializer.java      |   13 +-
 .../DatasiftInstagramActivitySerializer.java    |  118 ++
 .../DatasiftTweetActivitySerializer.java        |    6 +
 .../main/jsonschema/com/datasift/Datasift.json  | 1103 ------------------
 .../com/datasift/DatasiftConfiguration.json     |   22 -
 .../com/datasift/DatasiftPushConfiguration.json |   17 -
 .../datasift/DatasiftStreamConfiguration.json   |   17 -
 .../com/datasift/DatasiftTwitterUser.json       |   68 --
 .../org/apache/streams/datasift/Datasift.json   |  458 ++++++++
 .../streams/datasift/DatasiftConfiguration.json |  133 +++
 .../datasift/DatasiftPushConfiguration.json     |   17 +
 .../datasift/DatasiftStreamConfiguration.json   |   17 +
 .../datasift/facebook/DatasiftFacebook.json     |  120 ++
 .../datasift/instagram/DatasiftInstagram.json   |  178 +++
 .../interaction/DatasiftInteraction.json        |   92 ++
 .../datasift/twitter/DatasiftTwitter.json       |  365 ++++++
 .../datasift/twitter/DatasiftTwitterMedia.json  |  126 ++
 .../datasift/twitter/DatasiftTwitterUser.json   |   68 ++
 .../DatasiftTypeConverterProcessorTest.java     |    5 +-
 .../DatasiftActivitySerializerTest.java         |   19 +-
 .../test/resources/instagram_datasift_json.txt  |   25 +
 .../metadata/facebook_post.mup                  |  694 +++++++++++
 .../metadata/facebook_post.png                  |  Bin 0 -> 1038039 bytes
 .../api/FacebookPostActivitySerializer.java     |   29 +-
 .../processor/FacebookTypeConverter.java        |   12 +-
 .../pagefeed/FacebookDataCollector.java         |  138 +++
 .../pagefeed/FacebookPageFeedDataCollector.java |  124 ++
 .../pagefeed/FacebookPageFeedProvider.java      |   28 +
 .../provider/pagefeed/FacebookProvider.java     |  139 +++
 .../FacebookStreamsPostSerializer.java          |   60 +
 .../streams/facebook/FacebookConfiguration.json |   33 +
 .../provider/pagefeed/TestFacebookProvider.java |   94 ++
 .../processor/InstagramTypeConverter.java       |   19 +-
 .../provider/InstagramAbstractProvider.java     |  196 ++++
 .../provider/InstagramDataCollector.java        |  145 +++
 .../provider/InstagramRecentMediaCollector.java |  171 ---
 .../provider/InstagramRecentMediaProvider.java  |  201 ----
 .../InstagramRecentMediaCollector.java          |  111 ++
 .../InstagramRecentMediaProvider.java           |   60 +
 .../userinfo/InstagramUserInfoCollector.java    |   92 ++
 .../userinfo/InstagramUserInfoProvider.java     |   39 +
 .../serializer/InstagramUserInfoSerializer.java |   83 ++
 .../InstagramRecentMediaCollectorTest.java      |  159 ---
 .../InstagramRecentMediaProviderTest.java       |  175 ---
 .../InstagramRecentMediaCollectorTest.java      |  156 +++
 .../InstagramRecentMediaProviderTest.java       |  174 +++
 .../InstagramUserInfoCollectorTest.java         |  120 ++
 .../apache/streams/data/util/MoreoverUtils.java |    2 +-
 .../SysomosBeatActivityConverter.java           |    7 +-
 .../FetchAndReplaceTwitterProcessor.java        |    4 +-
 .../twitter/provider/TwitterConfigurator.java   |   70 ++
 .../provider/TwitterStreamConfigurator.java     |   99 --
 .../twitter/provider/TwitterStreamProvider.java |    2 +-
 .../provider/TwitterTimelineProvider.java       |    8 +-
 .../TwitterUserInformationProvider.java         |    4 +-
 .../apache/streams/core/util/DatumUtils.java    |   49 +
 streams-pojo/metadata/Serializer.mup            |  371 ++++++
 streams-pojo/metadata/serializer.png            |  Bin 0 -> 604337 bytes
 .../org/apache/streams/data/util/JsonUtil.java  |    8 +-
 .../jackson/StreamsDateTimeSerializer.java      |    4 -
 .../streams/jackson/StreamsJacksonMapper.java   |   10 -
 .../jackson/StreamsPeriodDeserializer.java      |    1 -
 .../jackson/StreamsPeriodSerializer.java        |    2 -
 .../org/apache/streams/pojo/json/activity.json  |    3 +-
 .../data/data/util/DateTimeSerDeTest.java       |    2 -
 streams-runtimes/streams-runtime-local/pom.xml  |    6 +
 .../local/tasks/StreamsPersistWriterTask.java   |    2 +
 .../local/tasks/StreamsProcessorTask.java       |    6 +-
 .../local/tasks/StreamsProviderTask.java        |   19 +-
 .../local/builders/LocalStreamBuilderTest.java  |   37 +-
 .../local/builders/ToyLocalBuilderExample.java  |    6 +-
 .../streams/local/tasks/BasicTasksTest.java     |    6 +-
 .../local/tasks/StreamsProviderTaskTest.java    |  147 +++
 .../test/processors/DoNothingProcessor.java     |    2 +-
 .../PassthroughDatumCounterProcessor.java       |    2 +-
 .../test/providers/NumericMessageProvider.java  |    2 +-
 .../local/test/writer/DatumCounterWriter.java   |    2 +-
 .../local/test/writer/DoNothingWriter.java      |    2 +-
 .../local/test/writer/SystemOutWriter.java      |    2 +-
 .../apache/streams/util/SerializationUtil.java  |    4 +-
 100 files changed, 5544 insertions(+), 2419 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: STREAMS-179 | Added new thread pool executor to shutdown streams

Posted by mf...@apache.org.
STREAMS-179 | Added new thread pool executor to shutdown streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc

Branch: refs/heads/master
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |   3 +-
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  45 ++++++++
 ...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
 
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
 import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     public void start() {
         attachShutdownHandler();
         boolean isRunning = true;
-        this.executor = Executors.newFixedThreadPool(this.totalTasks);
+        this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
         this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+    private LocalStreamBuilder streamBuilder;
+    private volatile boolean isStoped;
+
+    /**
+     * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+     * @param numThreads number of threads in pool
+     * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+     */
+    public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+        super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        this.streamBuilder = streamBuilder;
+        this.isStoped = false;
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        if(t != null) {
+            LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+            LOGGER.error("Attempting to shut down stream.");
+            synchronized (this) {
+                if (!this.isStoped) {
+                    this.isStoped = true;
+                    this.streamBuilder.stop();
+                }
+            }
+        } else {
+            LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+    @Test
+    public void testShutDownOnException() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+                throw new RuntimeException("Testing Throwable Handling!");
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+    @Test
+    public void testNormalExecution() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+}


[3/3] git commit: STREAMS-179 | Added javdoc comments.

Posted by mf...@apache.org.
STREAMS-179 | Added javdoc comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2397f4b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2397f4b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2397f4b4

Branch: refs/heads/master
Commit: 2397f4b480448ebd3e1a3c4efa4d9dc5ef64750a
Parents: e091c6c
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:24:22 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:24:22 2014 -0500

----------------------------------------------------------------------
 .../ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java        | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2397f4b4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index f8d6343..55a26e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.*;
 
 /**
+ * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable.
  * @see {@link java.util.concurrent.ThreadPoolExecutor}
  */
 public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {