You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/13 05:23:56 UTC
svn commit: r1567846 - in /incubator/streams/trunk: ./
streams-contrib/streams-persist-console/
streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/
streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kaf...
Author: sblackmon
Date: Thu Feb 13 04:23:55 2014
New Revision: 1567846
URL: http://svn.apache.org/r1567846
Log:
Misc updates
Modified:
incubator/streams/trunk/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Thu Feb 13 04:23:55 2014
@@ -69,7 +69,7 @@
<jaxb2.version>0.8.3</jaxb2.version>
<jaxbutil.version>1.2.6</jaxbutil.version>
<junit.version>4.11</junit.version>
- <slf4j.version>1.6.1</slf4j.version>
+ <slf4j.version>1.7.6</slf4j.version>
<logback.version>1.0.9</logback.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml Thu Feb 13 04:23:55 2014
@@ -22,5 +22,9 @@
<artifactId>streams-pojo</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Thu Feb 13 04:23:55 2014
@@ -2,23 +2,19 @@ package org.apache.streams.console;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsResultSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
public class ConsolePersistWriter implements StreamsPersistWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
- protected volatile Queue<StreamsDatum> persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ protected volatile Queue<StreamsDatum> persistQueue;
private ObjectMapper mapper = new ObjectMapper();
@@ -28,6 +24,7 @@ public class ConsolePersistWriter implem
@Override
public void start() {
+ Preconditions.checkNotNull(persistQueue);
new Thread(new ConsolePersistWriterTask(this)).start();
}
Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java Thu Feb 13 04:23:55 2014
@@ -23,7 +23,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
public class ConsolePersistWriterTask implements Runnable {
Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Thu Feb 13 04:23:55 2014
@@ -18,7 +18,7 @@ import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
@@ -99,4 +99,11 @@ public class KafkaPersistWriter implemen
LOGGER.warn("save: {}", e);
}// put
}
+
+ @Override
+ public void run() {
+ start();
+
+ // stop();
+ }
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Thu Feb 13 04:23:55 2014
@@ -63,7 +63,7 @@ public class TwitterEventProcessor imple
Thread.sleep(new Random().nextInt(100));
if(item==TERMINATE) {
LOGGER.info("Terminating!");
- break;
+ return;
}
// first check for valid json
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Thu Feb 13 04:23:55 2014
@@ -51,15 +51,11 @@ public class TwitterStreamProvider imple
protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ protected volatile Queue<StreamsDatum> providerQueue;
protected StreamingEndpoint endpoint;
protected BasicClient client;
- public BlockingQueue<Object> getInQueue() {
- return inQueue;
- }
-
protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
@@ -145,6 +141,10 @@ public class TwitterStreamProvider imple
return this.providerQueue;
}
+ public void setProviderQueue(Queue<StreamsDatum> providerQueue) {
+ this.providerQueue = providerQueue;
+ }
+
@Override
public StreamsResultSet readCurrent() {
return null;
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Thu Feb 13 04:23:55 2014
@@ -90,6 +90,8 @@ public class TwitterTimelineProvider imp
@Override
public void start() {
+ Preconditions.checkNotNull(providerQueue);
+
Preconditions.checkNotNull(this.klass);
Preconditions.checkNotNull(config.getOauth().getConsumerKey());
@@ -136,11 +138,7 @@ public class TwitterTimelineProvider imp
inQueue.add(TwitterEventProcessor.TERMINATE);
}
- while( !executor.isTerminated()) {
- try {
- executor.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) { }
- }
+ shutdownAndAwaitTermination(executor);
}
@Override
@@ -180,5 +178,23 @@ public class TwitterTimelineProvider imp
stop();
}
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java Thu Feb 13 04:23:55 2014
@@ -83,7 +83,12 @@ public class TwitterTimelineProviderTask
}
while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+ LOGGER.info("Provider Finished. Cleaning up...");
+
twitter.shutdown();
+
+ LOGGER.info("Provider Exiting");
+
}
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Thu Feb 13 04:23:55 2014
@@ -22,8 +22,6 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
/**
* Created with IntelliJ IDEA.
* User: mdelaet
@@ -74,13 +72,13 @@ public abstract class TwitterJsonEventAc
}
+ public abstract Activity convert(ObjectNode event);
+
@Override
public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently implemented");
}
- public abstract Activity convert(ObjectNode event);
-
public static Date parse(String str) {
Date date;
String dstr;
@@ -118,7 +116,7 @@ public abstract class TwitterJsonEventAc
}
public static void addTwitterExtension(Activity activity, ObjectNode event) {
- Map<String, Object> extensions = ensureExtensions(activity);
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
extensions.put("twitter", event);
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Thu Feb 13 04:23:55 2014
@@ -22,6 +22,7 @@ import org.joda.time.DateTime;
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -31,24 +32,28 @@ public class StreamsDatum implements Ser
public StreamsDatum(Object document) {
this.document = document;
+ this.metadata = new HashMap<String, Object>();
}
public StreamsDatum(Object document, BigInteger sequenceid) {
this.document = document;
this.sequenceid = sequenceid;
+ this.metadata = new HashMap<String, Object>();
}
public StreamsDatum(Object document, DateTime timestamp) {
this.document = document;
this.timestamp = timestamp;
+ this.metadata = new HashMap<String, Object>();
}
public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) {
this.document = document;
this.timestamp = timestamp;
this.sequenceid = sequenceid;
+ this.metadata = new HashMap<String, Object>();
}
public DateTime timestamp;