You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/10 00:07:40 UTC

svn commit: r1566466 - in /incubator/streams/branches/sblackmon: streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/ streams-core/src/main/java/org/apache/streams/core/

Author: sblackmon
Date: Sun Feb  9 23:07:39 2014
New Revision: 1566466

URL: http://svn.apache.org/r1566466
Log:
this change allows a processor to create multiple activities detected within a processed document

Modified:
    incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
    incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
    incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Sun Feb  9 23:07:39 2014
@@ -64,6 +64,8 @@ public class KafkaPersistWriter implemen
         ProducerConfig config = new ProducerConfig(props);
 
         producer = new Producer<String, String>(config);
+
+        new Thread(new KafkaPersistWriterTask(this)).start();
     }
 
     @Override

Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java (original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java Sun Feb  9 23:07:39 2014
@@ -20,13 +20,17 @@ public class KafkaPersistWriterTask impl
     public void run() {
 
         while(true) {
+            if( writer.getPersistQueue().peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
             try {
-                StreamsDatum entry = writer.persistQueue.remove();
-                writer.write(entry);
                 Thread.sleep(new Random().nextInt(100));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            } catch (InterruptedException e) {}
         }
 
     }

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Sun Feb  9 23:07:39 2014
@@ -18,7 +18,7 @@
 
 package org.apache.streams.core;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Queue;
 
 /**
@@ -35,6 +35,6 @@ public interface StreamsProcessor {
     public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
     public Queue<StreamsDatum> getProcessorOutputQueue();
 
-    public StreamsDatum process( StreamsDatum entry );
+    public List<StreamsDatum> process( StreamsDatum entry );
 
 }

Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Sun Feb  9 23:07:39 2014
@@ -20,7 +20,6 @@ package org.apache.streams.core;
 
 import org.joda.time.DateTime;
 
-import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Queue;
 
@@ -38,5 +37,4 @@ public interface StreamsProvider {
     public StreamsResultSet readNew(BigInteger sequence);
     public StreamsResultSet readRange(DateTime start, DateTime end);
 
-
 }