You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:21 UTC

[43/71] [abbrv] git commit: some refactoring for dataflow

some refactoring for dataflow

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572361 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/74650657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/74650657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/74650657

Branch: refs/heads/master
Commit: 74650657f1693ba4ca2ba539990d23a0e2a0bab1
Parents: 396eb4a
Author: sblackmon <sb...@unknown>
Authored: Thu Feb 27 00:13:52 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Thu Feb 27 00:13:52 2014 +0000

----------------------------------------------------------------------
 .../streams/console/ConsolePersistReader.java   | 80 ++++++++++++++++++++
 .../streams/console/ConsolePersistWriter.java   | 10 +--
 .../twitter/provider/TwitterStreamProvider.java |  2 +
 3 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74650657/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
new file mode 100644
index 0000000..460d95d
--- /dev/null
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -0,0 +1,80 @@
+package org.apache.streams.console;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConsolePersistReader implements StreamsPersistReader {
+
+    private final static String STREAMS_ID = "ConsolePersistReader";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    public ConsolePersistReader() {
+        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ConsolePersistReader(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    public void prepare(Object o) {
+
+    }
+
+    public void cleanUp() {
+
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        LOGGER.info("{} readCurrent", STREAMS_ID);
+
+        Scanner sc = new Scanner(System.in);
+
+        while( sc.hasNextLine() ) {
+
+            persistQueue.offer(new StreamsDatum(sc.nextLine()));
+
+        }
+
+        LOGGER.info("{} providing {} docs", STREAMS_ID, persistQueue.size());
+        LOGGER.info("{} Exiting", STREAMS_ID);
+
+        return (StreamsResultSet) persistQueue;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return readCurrent();
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return readCurrent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74650657/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 8f56821..b86beb9 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
+public class ConsolePersistWriter implements StreamsPersistWriter {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
@@ -29,7 +29,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
     }
 
     public void prepare(Object o) {
-
+        Preconditions.checkNotNull(persistQueue);
     }
 
     public void cleanUp() {
@@ -51,10 +51,4 @@ public class ConsolePersistWriter implements StreamsPersistWriter, Runnable {
 
     }
 
-    @Override
-    public void run() {
-        Preconditions.checkNotNull(persistQueue);
-        new Thread(new ConsolePersistWriterTask(this)).start();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74650657/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 34b3ab1..7f9f4b5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -36,6 +36,8 @@ import java.util.concurrent.*;
  */
 public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
+    private final static String STREAMS_ID = "TwitterStreamProvider";
+
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
     private TwitterStreamConfiguration config;