You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/13 05:23:56 UTC

svn commit: r1567846 - in /incubator/streams/trunk: ./ streams-contrib/streams-persist-console/ streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kaf...

Author: sblackmon
Date: Thu Feb 13 04:23:55 2014
New Revision: 1567846

URL: http://svn.apache.org/r1567846
Log:
Misc updates



Modified:
    incubator/streams/trunk/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
    incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java

Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Thu Feb 13 04:23:55 2014
@@ -69,7 +69,7 @@
         <jaxb2.version>0.8.3</jaxb2.version>
         <jaxbutil.version>1.2.6</jaxbutil.version>
         <junit.version>4.11</junit.version>
-        <slf4j.version>1.6.1</slf4j.version>
+        <slf4j.version>1.7.6</slf4j.version>
         <logback.version>1.0.9</logback.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>

Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/pom.xml Thu Feb 13 04:23:55 2014
@@ -22,5 +22,9 @@
             <artifactId>streams-pojo</artifactId>
             <version>0.1-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Thu Feb 13 04:23:55 2014
@@ -2,23 +2,19 @@ package org.apache.streams.console;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsResultSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class ConsolePersistWriter implements StreamsPersistWriter {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> persistQueue;
 
     private ObjectMapper mapper = new ObjectMapper();
 
@@ -28,6 +24,7 @@ public class ConsolePersistWriter implem
 
     @Override
     public void start() {
+        Preconditions.checkNotNull(persistQueue);
         new Thread(new ConsolePersistWriterTask(this)).start();
     }
 

Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java Thu Feb 13 04:23:55 2014
@@ -23,7 +23,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
 
 public class ConsolePersistWriterTask implements Runnable {
 

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Thu Feb 13 04:23:55 2014
@@ -18,7 +18,7 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable {
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
 
@@ -99,4 +99,11 @@ public class KafkaPersistWriter implemen
             LOGGER.warn("save: {}", e);
         }// put
     }
+
+    @Override
+    public void run() {
+        start();
+
+        // stop();
+    }
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Thu Feb 13 04:23:55 2014
@@ -63,7 +63,7 @@ public class TwitterEventProcessor imple
                 Thread.sleep(new Random().nextInt(100));
                 if(item==TERMINATE) {
                     LOGGER.info("Terminating!");
-                    break;
+                    return;
                 }
 
                 // first check for valid json

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Thu Feb 13 04:23:55 2014
@@ -51,15 +51,11 @@ public class TwitterStreamProvider imple
 
     protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> providerQueue;
 
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
 
-    public BlockingQueue<Object> getInQueue() {
-        return inQueue;
-    }
-
     protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
@@ -145,6 +141,10 @@ public class TwitterStreamProvider imple
         return this.providerQueue;
     }
 
+    public void setProviderQueue(Queue<StreamsDatum> providerQueue) {
+        this.providerQueue = providerQueue;
+    }
+
     @Override
     public StreamsResultSet readCurrent() {
         return null;

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Thu Feb 13 04:23:55 2014
@@ -90,6 +90,8 @@ public class TwitterTimelineProvider imp
     @Override
     public void start() {
 
+        Preconditions.checkNotNull(providerQueue);
+
         Preconditions.checkNotNull(this.klass);
 
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());
@@ -136,11 +138,7 @@ public class TwitterTimelineProvider imp
             inQueue.add(TwitterEventProcessor.TERMINATE);
         }
 
-        while( !executor.isTerminated()) {
-            try {
-                executor.awaitTermination(1, TimeUnit.SECONDS);
-            } catch (InterruptedException e) { }
-        }
+        shutdownAndAwaitTermination(executor);
     }
 
     @Override
@@ -180,5 +178,23 @@ public class TwitterTimelineProvider imp
 
         stop();
     }
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
 
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java Thu Feb 13 04:23:55 2014
@@ -83,7 +83,12 @@ public class TwitterTimelineProviderTask
         }
         while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
 
+        LOGGER.info("Provider Finished.  Cleaning up...");
+
         twitter.shutdown();
+
+        LOGGER.info("Provider Exiting");
+
     }
 
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Thu Feb 13 04:23:55 2014
@@ -22,8 +22,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
 /**
 * Created with IntelliJ IDEA.
 * User: mdelaet
@@ -74,13 +72,13 @@ public abstract class TwitterJsonEventAc
 
     }
 
+    public abstract Activity convert(ObjectNode event);
+
     @Override
     public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently implemented");
     }
 
-    public abstract Activity convert(ObjectNode event);
-
     public static Date parse(String str) {
         Date date;
         String dstr;
@@ -118,7 +116,7 @@ public abstract class TwitterJsonEventAc
     }
 
     public static void addTwitterExtension(Activity activity, ObjectNode event) {
-        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
         extensions.put("twitter", event);
     }
 

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1567846&r1=1567845&r2=1567846&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Thu Feb 13 04:23:55 2014
@@ -22,6 +22,7 @@ import org.joda.time.DateTime;
 
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -31,24 +32,28 @@ public class StreamsDatum implements Ser
 
     public StreamsDatum(Object document) {
         this.document = document;
+        this.metadata = new HashMap<String, Object>();
     }
 
     public StreamsDatum(Object document, BigInteger sequenceid) {
 
         this.document = document;
         this.sequenceid = sequenceid;
+        this.metadata = new HashMap<String, Object>();
     }
 
     public StreamsDatum(Object document, DateTime timestamp) {
 
         this.document = document;
         this.timestamp = timestamp;
+        this.metadata = new HashMap<String, Object>();
     }
 
     public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) {
         this.document = document;
         this.timestamp = timestamp;
         this.sequenceid = sequenceid;
+        this.metadata = new HashMap<String, Object>();
     }
 
     public DateTime timestamp;