You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/10 18:58:18 UTC

svn commit: r1566690 - in /incubator/streams/trunk: ./ streams-config/ streams-contrib/ streams-contrib/streams-persist-hdfs/ streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/ streams-contrib/streams-provider-facebook/src/ma...

Author: sblackmon
Date: Mon Feb 10 17:58:17 2014
New Revision: 1566690

URL: http://svn.apache.org/r1566690
Log:
svn ci -m "MERGE streams sblackmon [1556700]:[1566685] into trunk"

Added:
    incubator/streams/trunk/streams-contrib/streams-persist-hdfs/
      - copied from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-persist-hdfs/
    incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
      - copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
      - copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
      - copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
      - copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
Modified:
    incubator/streams/trunk/   (props changed)
    incubator/streams/trunk/pom.xml
    incubator/streams/trunk/streams-config/pom.xml
    incubator/streams/trunk/streams-contrib/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
    incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
    incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml
    incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
    incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
    incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
    incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
    incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java

Propchange: incubator/streams/trunk/
------------------------------------------------------------------------------
  Merged /incubator/streams/branches/sblackmon:r1559230-1566685

Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Mon Feb 10 17:58:17 2014
@@ -48,6 +48,10 @@
           <id>clojars.org</id>
           <url>http://clojars.org/repo</url>
         </repository>
+        <repository>
+            <id>cloudera</id>
+            <url>https://repository.cloudera.com/artifactory/repo</url>
+        </repository>
     </repositories>
 
     <properties>
@@ -69,6 +73,7 @@
         <logback.version>1.0.9</logback.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
+        <typesafe.config.version>1.2.0</typesafe.config.version>
         <guava.version>15.0</guava.version>
         <scala.version>2.8.0</scala.version>
         <clojure.version>1.4.0</clojure.version>
@@ -174,6 +179,11 @@
                 <type>jar</type>
                 <scope>compile</scope>
             </dependency>
+            <dependency>
+                <groupId>com.typesafe</groupId>
+                <artifactId>config</artifactId>
+                <version>${typesafe.config.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>junit</groupId>

Modified: incubator/streams/trunk/streams-config/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config/pom.xml (original)
+++ incubator/streams/trunk/streams-config/pom.xml Mon Feb 10 17:58:17 2014
@@ -34,7 +34,7 @@
         <dependency>
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
-            <version>1.0.2</version>
+            <version>1.2.0</version>
         </dependency>
         <!--<dependency>-->
             <!--<groupId>commons-configuration</groupId>-->

Modified: incubator/streams/trunk/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/pom.xml Mon Feb 10 17:58:17 2014
@@ -37,6 +37,7 @@
 
     <modules>
         <module>streams-persist-console</module>
+        <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Mon Feb 10 17:58:17 2014
@@ -64,6 +64,8 @@ public class KafkaPersistWriter implemen
         ProducerConfig config = new ProducerConfig(props);
 
         producer = new Producer<String, String>(config);
+
+        new Thread(new KafkaPersistWriterTask(this)).start();
     }
 
     @Override

Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java Mon Feb 10 17:58:17 2014
@@ -20,13 +20,17 @@ public class KafkaPersistWriterTask impl
     public void run() {
 
         while(true) {
+            if( writer.getPersistQueue().peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
             try {
-                StreamsDatum entry = writer.persistQueue.remove();
-                writer.write(entry);
                 Thread.sleep(new Random().nextInt(100));
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            } catch (InterruptedException e) {}
         }
 
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Js
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.pojo.json.*;
-import org.joda.time.DateTime;
 
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -36,7 +35,7 @@ import static org.apache.streams.data.ut
 /**
  * Serializes activity posts
  */
-public class FacebookPostActivitySerializer implements ActivitySerializer {
+public class FacebookPostActivitySerializer implements ActivitySerializer<String> {
 
     public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
     public static final String PROVIDER_NAME = "facebook";
@@ -66,8 +65,7 @@ public class FacebookPostActivitySeriali
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
-        //TODO Support
+    public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently supported by this deserializer");
     }
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -20,7 +20,7 @@ import java.util.Map;
  * Time: 10:48 AM
  * To change this template use File | Settings | File Templates.
  */
-public class GPlusActivitySerializer implements ActivitySerializer {
+public class GPlusActivitySerializer implements ActivitySerializer<String> {
     private final static Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
 
     @Override
@@ -68,7 +68,7 @@ public class GPlusActivitySerializer imp
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
+    public List<Activity> deserializeAll(List<String> serializedList) {
         //TODO Support
         throw new NotImplementedException("Not currently supported by this deserializer");
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -21,7 +21,7 @@ import java.util.Map;
  * Time: 8:32 AM
  * To change this template use File | Settings | File Templates.
  */
-public class RedditActivitySerializer implements ActivitySerializer{
+public class RedditActivitySerializer implements ActivitySerializer<String> {
     private final static Logger LOGGER = LoggerFactory.getLogger(RedditActivitySerializer.class);
 
     public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@@ -82,7 +82,7 @@ public class RedditActivitySerializer im
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
+    public List<Activity> deserializeAll(List<String> serializedList) {
         //TODO Support
         throw new NotImplementedException("Not currently supported by this deserializer");
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -35,7 +35,7 @@ import java.util.List;
  * Time: 3:07 PM
  * To change this template use File | Settings | File Templates.
  */
-public class ActivityXMLActivitySerializer implements ActivitySerializer {
+public class ActivityXMLActivitySerializer implements ActivitySerializer<String> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ActivityXMLActivitySerializer.class);
 
@@ -78,8 +78,7 @@ public class ActivityXMLActivitySerializ
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
-        //TODO Support
+    public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently supported by this deserializer");
     }
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -19,7 +19,7 @@ import java.util.List;
  * Time: 2:29 PM
  * To change this template use File | Settings | File Templates.
  */
-public class PowerTrackActivitySerializer implements ActivitySerializer {
+public class PowerTrackActivitySerializer implements ActivitySerializer<String> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PowerTrackActivitySerializer.class);
 
@@ -96,8 +96,8 @@ public class PowerTrackActivitySerialize
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
-        //TODO Support
+    public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently supported by this deserializer");
     }
+
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml Mon Feb 10 17:58:17 2014
@@ -12,7 +12,14 @@
     <artifactId>streams-provider-moreover</artifactId>
 
     <dependencies>
-
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>

Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -2,7 +2,6 @@ package org.apache.streams.data;
 
 import com.fasterxml.jackson.databind.AnnotationIntrospector;
 import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
@@ -17,7 +16,7 @@ import java.util.List;
 /**
  * Deserializes Moreover JSON format into Activities
  */
-public class MoreoverJsonActivitySerializer implements ActivitySerializer {
+public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
     @Override
     public String serializationFormat() {
         return "application/json+vnd.moreover.com.v1";
@@ -64,7 +63,7 @@ public class MoreoverJsonActivitySeriali
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
+    public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently implemented");
     }
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -12,14 +12,13 @@ import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import java.io.StringReader;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
 /**
  * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
  */
-public class MoreoverXmlActivitySerializer implements ActivitySerializer {
+public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
 
     //JAXBContext is threadsafe (supposedly)
     private final JAXBContext articleContext;
@@ -47,11 +46,13 @@ public class MoreoverXmlActivitySerializ
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
-        ArticlesResponse response = deserializeMoreoverResponse(serializedList);
+    public List<Activity> deserializeAll(List<String> serializedList) {
         List<Activity> activities = new LinkedList<Activity>();
-        for(Article article : response.getArticles().getArticle()) {
-            activities.add(MoreoverUtils.convert(article));
+        for(String item : serializedList) {
+            ArticlesResponse response = deserializeMoreoverResponse(item);
+            for(Article article : response.getArticles().getArticle()) {
+                activities.add(MoreoverUtils.convert(article));
+            }
         }
         return activities;
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java Mon Feb 10 17:58:17 2014
@@ -1,6 +1,7 @@
 package org.apache.streams.data;
 
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.streams.pojo.json.Activity;
 import org.junit.Before;
@@ -12,10 +13,7 @@ import java.io.StringWriter;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import static java.util.regex.Pattern.matches;
 import static org.apache.streams.data.util.MoreoverTestUtil.test;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
 
 public class MoreoverXmlActivitySerializerTest {
     ActivitySerializer serializer;
@@ -29,7 +27,7 @@ public class MoreoverXmlActivitySerializ
 
     @Test
     public void loadData() throws Exception {
-        List<Activity> activities = serializer.deserializeAll(xml);
+        List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
         for (Activity activity : activities) {
             test(activity);
         }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -9,31 +9,21 @@ import org.apache.streams.pojo.json.*;
 import java.util.List;
 
 /**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ * Deserializes the Rome SyndEntry POJO and converts it to an instance of {@link Activity}
  */
-public class SyndEntryActivitySerializer implements ActivitySerializer {
-
+public class SyndEntryActivitySerializer implements ActivitySerializer<SyndEntry> {
 
     @Override
     public String serializationFormat() {
-        return "application/xml+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+        return "application/streams-provider-rss";
     }
 
     @Override
-    public Activity deserialize(String serialized) {
-        return null;
+    public SyndEntry serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Rome");
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
-        return null;
-    }
-
     public Activity deserialize(SyndEntry serialized) {
         Preconditions.checkNotNull(serialized);
         Activity activity = new Activity();
@@ -61,6 +51,7 @@ public class SyndEntryActivitySerializer
         return activity;
     }
 
+    @Override
     public List<Activity> deserializeAll(List<SyndEntry> serializedList) {
         List<Activity> activityList = Lists.newArrayList();
         for(SyndEntry entry : serializedList) {

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml Mon Feb 10 17:58:17 2014
@@ -15,7 +15,6 @@
         <dependency>
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
-            <version>1.0.2</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
@@ -63,8 +62,13 @@
         </dependency>
         <dependency>
             <groupId>com.twitter</groupId>
-            <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
-            <version>1.4.2</version> <!-- or whatever the latest version is -->
+            <artifactId>hbc-core</artifactId>
+            <version>1.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.twitter4j</groupId>
+            <artifactId>twitter4j-core</artifactId>
+            <version>3.0.5</version>
         </dependency>
         <!--<dependency>-->
             <!--<groupId>com.twitter</groupId>-->

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Mon Feb 10 17:58:17 2014
@@ -16,7 +16,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java Mon Feb 10 17:58:17 2014
@@ -1,5 +1,6 @@
 package org.apache.streams.twitter.provider;
 
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
 import org.apache.streams.config.StreamsConfigurator;
@@ -8,6 +9,8 @@ import org.apache.streams.twitter.Twitte
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 /**
  * Created by sblackmon on 12/10/13.
  */
@@ -19,6 +22,10 @@ public class TwitterStreamConfigurator {
         Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
 
         TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
+        twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
+        twitterStreamConfiguration.setHost(twitter.getString("host"));
+        twitterStreamConfiguration.setPort(twitter.getLong("port"));
+        twitterStreamConfiguration.setVersion(twitter.getString("version"));
         TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
         twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
         twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
@@ -30,11 +37,16 @@ public class TwitterStreamConfigurator {
             twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
         } catch( ConfigException ce ) {}
         try {
-            twitterStreamConfiguration.setFollow(twitter.getLongList("follow"));
+            List<Long> follows = Lists.newArrayList();
+            for( Integer id : twitter.getIntList("follow"))
+                follows.add(new Long(id));
+            twitterStreamConfiguration.setFollow(follows);
         } catch( ConfigException ce ) {}
 
         twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
         twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+        twitterStreamConfiguration.setJsonStoreEnabled("true");
+        twitterStreamConfiguration.setIncludeEntities("true");
 
         return twitterStreamConfiguration;
     }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Mon Feb 10 17:58:17 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterStreamProvider /*extends BaseRichSpout*/ implements StreamsProvider, Serializable {
+public class TwitterStreamProvider implements StreamsProvider, Serializable, Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
@@ -51,7 +51,7 @@ public class TwitterStreamProvider /*ext
 
     protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
 
-    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
     protected StreamingEndpoint endpoint;
     protected BasicClient client;
@@ -159,44 +159,19 @@ public class TwitterStreamProvider /*ext
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }
-//
-//    @Override
-//    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-//        outputFieldsDeclarer.declare(new Fields("document"));
-//        outputFieldsDeclarer.declare(new Fields("type"));
-//    }
-//
-//    @Override
-//    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-//        collector = spoutOutputCollector;
-//        run();
-//    }
-//
-//    @Override
-//    public void nextTuple() {
-//        try {
-//            collector.emit( new Values(outQueue.take(), klass) );
-//        } catch (InterruptedException e) {
-//            e.printStackTrace();
-//        }
-//
-//    }
 
-    public class TwitterStreamCloser implements Runnable {
-
-        BlockingQueue<String> queue;
+    @Override
+    public void run() {
 
-        public TwitterStreamCloser(BlockingQueue<String> queue) {
-            this.queue = queue;
-        }
+        start();
 
-        public void run() {
-            for (int i = 0; i < 10; i++) {
-                queue.add(TwitterEventProcessor.TERMINATE);
-            }
+        while( !executor.isTerminated()) {
+            try {
+                executor.awaitTermination(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) { }
         }
 
+        stop();
     }
 
-
 }

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -13,7 +13,6 @@ import org.apache.streams.pojo.json.Acti
 import org.apache.streams.pojo.json.Generator;
 import org.apache.streams.pojo.json.Icon;
 import org.apache.streams.pojo.json.Provider;
-import org.joda.time.DateTime;
 
 import java.io.IOException;
 import java.text.DateFormat;
@@ -32,7 +31,7 @@ import static org.apache.streams.data.ut
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
 
     public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 
@@ -76,7 +75,7 @@ public abstract class TwitterJsonEventAc
     }
 
     @Override
-    public List<Activity> deserializeAll(String serializedList) {
+    public List<Activity> deserializeAll(List<String> serializedList) {
         throw new NotImplementedException("Not currently implemented");
     }
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -22,7 +22,7 @@ import static org.apache.streams.data.ut
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
 
     public Activity convert(ObjectNode event) {
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -21,7 +21,7 @@ import static org.apache.streams.data.ut
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
 
     public Activity convert(ObjectNode event) {
 

Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf Mon Feb 10 17:58:17 2014
@@ -8,4 +8,5 @@ twitter {
     oauth {
         appName = "Apache Streams"
     }
+
 }
\ No newline at end of file

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Mon Feb 10 17:58:17 2014
@@ -18,7 +18,7 @@
 
 package org.apache.streams.core;
 
-import java.io.Serializable;
+import java.util.List;
 import java.util.Queue;
 
 /**
@@ -35,6 +35,6 @@ public interface StreamsProcessor {
     public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
     public Queue<StreamsDatum> getProcessorOutputQueue();
 
-    public StreamsDatum process( StreamsDatum entry );
+    public List<StreamsDatum> process( StreamsDatum entry );
 
 }

Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Mon Feb 10 17:58:17 2014
@@ -20,7 +20,6 @@ package org.apache.streams.core;
 
 import org.joda.time.DateTime;
 
-import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Queue;
 
@@ -38,5 +37,4 @@ public interface StreamsProvider {
     public StreamsResultSet readNew(BigInteger sequence);
     public StreamsResultSet readRange(DateTime start, DateTime end);
 
-
 }

Modified: incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -23,9 +23,9 @@ import org.apache.streams.pojo.json.Acti
 import java.util.List;
 
 /**
- * Serializes and deserializes Activities from a String
+ * Serializes and deserializes Activities
  */
-public interface ActivitySerializer {
+public interface ActivitySerializer<T> {
 
     /**
      * Gets the supported content type that can be deserialized/serialized
@@ -35,24 +35,24 @@ public interface ActivitySerializer {
     String serializationFormat();
 
     /**
-     * Converts the activity to a String representation.
+     * Converts the activity to a POJO representation.
      *
      * @param deserialized the string
      * @return a fully populated Activity object
      */
-    String serialize(Activity deserialized);
+    T serialize(Activity deserialized);
 
     /**
-     * Converts a string into an Activity
+     * Converts a POJO into an Activity
      * @param serialized the string representation
      * @return a fully populated Activity object
      */
-    Activity deserialize(String serialized);
+    Activity deserialize(T serialized);
 
     /**
-     * Converts a string representing multiple activities into a list of Activity objects
-     * @param serializedList a String representation of a List
+     * Converts multiple documents into a list of Activity objects
+     * @param serializedList a typed List of documents
      * @return a list of fully populated activities
      */
-    List<Activity> deserializeAll(String serializedList);
+    List<Activity> deserializeAll(List<T> serializedList);
 }