You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/10 00:07:40 UTC
svn commit: r1566466 - in /incubator/streams/branches/sblackmon:
streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/
streams-core/src/main/java/org/apache/streams/core/
Author: sblackmon
Date: Sun Feb 9 23:07:39 2014
New Revision: 1566466
URL: http://svn.apache.org/r1566466
Log:
this change allows a processor to create multiple activities detected within a processed document
Modified:
incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Sun Feb 9 23:07:39 2014
@@ -64,6 +64,8 @@ public class KafkaPersistWriter implemen
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
+
+ new Thread(new KafkaPersistWriterTask(this)).start();
}
@Override
Modified: incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java (original)
+++ incubator/streams/branches/sblackmon/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java Sun Feb 9 23:07:39 2014
@@ -20,13 +20,17 @@ public class KafkaPersistWriterTask impl
public void run() {
while(true) {
+ if( writer.getPersistQueue().peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
Thread.sleep(new Random().nextInt(100));
- } catch (Exception e) {
- e.printStackTrace();
- }
+ } catch (InterruptedException e) {}
}
}
Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Sun Feb 9 23:07:39 2014
@@ -18,7 +18,7 @@
package org.apache.streams.core;
-import java.io.Serializable;
+import java.util.List;
import java.util.Queue;
/**
@@ -35,6 +35,6 @@ public interface StreamsProcessor {
public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
public Queue<StreamsDatum> getProcessorOutputQueue();
- public StreamsDatum process( StreamsDatum entry );
+ public List<StreamsDatum> process( StreamsDatum entry );
}
Modified: incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1566466&r1=1566465&r2=1566466&view=diff
==============================================================================
--- incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/branches/sblackmon/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Sun Feb 9 23:07:39 2014
@@ -20,7 +20,6 @@ package org.apache.streams.core;
import org.joda.time.DateTime;
-import java.io.Serializable;
import java.math.BigInteger;
import java.util.Queue;
@@ -38,5 +37,4 @@ public interface StreamsProvider {
public StreamsResultSet readNew(BigInteger sequence);
public StreamsResultSet readRange(DateTime start, DateTime end);
-
}