You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/10 18:58:18 UTC
svn commit: r1566690 - in /incubator/streams/trunk: ./ streams-config/
streams-contrib/ streams-contrib/streams-persist-hdfs/
streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/
streams-contrib/streams-provider-facebook/src/ma...
Author: sblackmon
Date: Mon Feb 10 17:58:17 2014
New Revision: 1566690
URL: http://svn.apache.org/r1566690
Log:
svn ci -m "MERGE streams sblackmon [1556700]:[1566685] into trunk"
Added:
incubator/streams/trunk/streams-contrib/streams-persist-hdfs/
- copied from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-persist-hdfs/
incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
- copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
- copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
- copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
- copied unchanged from r1566685, incubator/streams/branches/sblackmon/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
Modified:
incubator/streams/trunk/ (props changed)
incubator/streams/trunk/pom.xml
incubator/streams/trunk/streams-config/pom.xml
incubator/streams/trunk/streams-contrib/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml
incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
Propchange: incubator/streams/trunk/
------------------------------------------------------------------------------
Merged /incubator/streams/branches/sblackmon:r1559230-1566685
Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Mon Feb 10 17:58:17 2014
@@ -48,6 +48,10 @@
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/repo</url>
+ </repository>
</repositories>
<properties>
@@ -69,6 +73,7 @@
<logback.version>1.0.9</logback.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
+ <typesafe.config.version>1.2.0</typesafe.config.version>
<guava.version>15.0</guava.version>
<scala.version>2.8.0</scala.version>
<clojure.version>1.4.0</clojure.version>
@@ -174,6 +179,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>${typesafe.config.version}</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
Modified: incubator/streams/trunk/streams-config/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config/pom.xml (original)
+++ incubator/streams/trunk/streams-config/pom.xml Mon Feb 10 17:58:17 2014
@@ -34,7 +34,7 @@
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <version>1.0.2</version>
+ <version>1.2.0</version>
</dependency>
<!--<dependency>-->
<!--<groupId>commons-configuration</groupId>-->
Modified: incubator/streams/trunk/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/pom.xml Mon Feb 10 17:58:17 2014
@@ -37,6 +37,7 @@
<modules>
<module>streams-persist-console</module>
+ <module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java Mon Feb 10 17:58:17 2014
@@ -64,6 +64,8 @@ public class KafkaPersistWriter implemen
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
+
+ new Thread(new KafkaPersistWriterTask(this)).start();
}
@Override
Modified: incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java Mon Feb 10 17:58:17 2014
@@ -20,13 +20,17 @@ public class KafkaPersistWriterTask impl
public void run() {
while(true) {
+ if( writer.getPersistQueue().peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
Thread.sleep(new Random().nextInt(100));
- } catch (Exception e) {
- e.printStackTrace();
- }
+ } catch (InterruptedException e) {}
}
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Js
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.pojo.json.*;
-import org.joda.time.DateTime;
import java.text.DateFormat;
import java.text.ParseException;
@@ -36,7 +35,7 @@ import static org.apache.streams.data.ut
/**
* Serializes activity posts
*/
-public class FacebookPostActivitySerializer implements ActivitySerializer {
+public class FacebookPostActivitySerializer implements ActivitySerializer<String> {
public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
public static final String PROVIDER_NAME = "facebook";
@@ -66,8 +65,7 @@ public class FacebookPostActivitySeriali
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
- //TODO Support
+ public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently supported by this deserializer");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-googleplus/src/main/java/com/gplus/api/GPlusActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -20,7 +20,7 @@ import java.util.Map;
* Time: 10:48 AM
* To change this template use File | Settings | File Templates.
*/
-public class GPlusActivitySerializer implements ActivitySerializer {
+public class GPlusActivitySerializer implements ActivitySerializer<String> {
private final static Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
@Override
@@ -68,7 +68,7 @@ public class GPlusActivitySerializer imp
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
+ public List<Activity> deserializeAll(List<String> serializedList) {
//TODO Support
throw new NotImplementedException("Not currently supported by this deserializer");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-edc-reddit/src/main/java/com/reddit/api/RedditActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -21,7 +21,7 @@ import java.util.Map;
* Time: 8:32 AM
* To change this template use File | Settings | File Templates.
*/
-public class RedditActivitySerializer implements ActivitySerializer{
+public class RedditActivitySerializer implements ActivitySerializer<String> {
private final static Logger LOGGER = LoggerFactory.getLogger(RedditActivitySerializer.class);
public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@@ -82,7 +82,7 @@ public class RedditActivitySerializer im
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
+ public List<Activity> deserializeAll(List<String> serializedList) {
//TODO Support
throw new NotImplementedException("Not currently supported by this deserializer");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/ActivityXMLActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -35,7 +35,7 @@ import java.util.List;
* Time: 3:07 PM
* To change this template use File | Settings | File Templates.
*/
-public class ActivityXMLActivitySerializer implements ActivitySerializer {
+public class ActivityXMLActivitySerializer implements ActivitySerializer<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(ActivityXMLActivitySerializer.class);
@@ -78,8 +78,7 @@ public class ActivityXMLActivitySerializ
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
- //TODO Support
+ public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently supported by this deserializer");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-gnip/gnip-powertrack/src/main/java/org/apache/streams/gnip/powertrack/PowerTrackActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -19,7 +19,7 @@ import java.util.List;
* Time: 2:29 PM
* To change this template use File | Settings | File Templates.
*/
-public class PowerTrackActivitySerializer implements ActivitySerializer {
+public class PowerTrackActivitySerializer implements ActivitySerializer<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(PowerTrackActivitySerializer.class);
@@ -96,8 +96,8 @@ public class PowerTrackActivitySerialize
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
- //TODO Support
+ public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently supported by this deserializer");
}
+
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/pom.xml Mon Feb 10 17:58:17 2014
@@ -12,7 +12,14 @@
<artifactId>streams-provider-moreover</artifactId>
<dependencies>
-
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -2,7 +2,6 @@ package org.apache.streams.data;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
@@ -17,7 +16,7 @@ import java.util.List;
/**
* Deserializes Moreover JSON format into Activities
*/
-public class MoreoverJsonActivitySerializer implements ActivitySerializer {
+public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
@Override
public String serializationFormat() {
return "application/json+vnd.moreover.com.v1";
@@ -64,7 +63,7 @@ public class MoreoverJsonActivitySeriali
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
+ public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently implemented");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -12,14 +12,13 @@ import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.StringReader;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
/**
* Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
*/
-public class MoreoverXmlActivitySerializer implements ActivitySerializer {
+public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
//JAXBContext is threadsafe (supposedly)
private final JAXBContext articleContext;
@@ -47,11 +46,13 @@ public class MoreoverXmlActivitySerializ
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
- ArticlesResponse response = deserializeMoreoverResponse(serializedList);
+ public List<Activity> deserializeAll(List<String> serializedList) {
List<Activity> activities = new LinkedList<Activity>();
- for(Article article : response.getArticles().getArticle()) {
- activities.add(MoreoverUtils.convert(article));
+ for(String item : serializedList) {
+ ArticlesResponse response = deserializeMoreoverResponse(item);
+ for(Article article : response.getArticles().getArticle()) {
+ activities.add(MoreoverUtils.convert(article));
+ }
}
return activities;
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java Mon Feb 10 17:58:17 2014
@@ -1,6 +1,7 @@
package org.apache.streams.data;
+import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.streams.pojo.json.Activity;
import org.junit.Before;
@@ -12,10 +13,7 @@ import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.List;
-import static java.util.regex.Pattern.matches;
import static org.apache.streams.data.util.MoreoverTestUtil.test;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
public class MoreoverXmlActivitySerializerTest {
ActivitySerializer serializer;
@@ -29,7 +27,7 @@ public class MoreoverXmlActivitySerializ
@Test
public void loadData() throws Exception {
- List<Activity> activities = serializer.deserializeAll(xml);
+ List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
for (Activity activity : activities) {
test(activity);
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -9,31 +9,21 @@ import org.apache.streams.pojo.json.*;
import java.util.List;
/**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ * Deserializes the Rome SyndEntry POJO and converts it to an instance of {@link Activity}
*/
-public class SyndEntryActivitySerializer implements ActivitySerializer {
-
+public class SyndEntryActivitySerializer implements ActivitySerializer<SyndEntry> {
@Override
public String serializationFormat() {
- return "application/xml+vnd.moreover.com.v1";
- }
-
- @Override
- public String serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+ return "application/streams-provider-rss";
}
@Override
- public Activity deserialize(String serialized) {
- return null;
+ public SyndEntry serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Rome");
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
- return null;
- }
-
public Activity deserialize(SyndEntry serialized) {
Preconditions.checkNotNull(serialized);
Activity activity = new Activity();
@@ -61,6 +51,7 @@ public class SyndEntryActivitySerializer
return activity;
}
+ @Override
public List<Activity> deserializeAll(List<SyndEntry> serializedList) {
List<Activity> activityList = Lists.newArrayList();
for(SyndEntry entry : serializedList) {
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/pom.xml Mon Feb 10 17:58:17 2014
@@ -15,7 +15,6 @@
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
- <version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -63,8 +62,13 @@
</dependency>
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
- <version>1.4.2</version> <!-- or whatever the latest version is -->
+ <artifactId>hbc-core</artifactId>
+ <version>1.4.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>3.0.5</version>
</dependency>
<!--<dependency>-->
<!--<groupId>com.twitter</groupId>-->
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Mon Feb 10 17:58:17 2014
@@ -16,7 +16,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.Serializable;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java Mon Feb 10 17:58:17 2014
@@ -1,5 +1,6 @@
package org.apache.streams.twitter.provider;
+import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import org.apache.streams.config.StreamsConfigurator;
@@ -8,6 +9,8 @@ import org.apache.streams.twitter.Twitte
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* Created by sblackmon on 12/10/13.
*/
@@ -19,6 +22,10 @@ public class TwitterStreamConfigurator {
Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
+ twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
+ twitterStreamConfiguration.setHost(twitter.getString("host"));
+ twitterStreamConfiguration.setPort(twitter.getLong("port"));
+ twitterStreamConfiguration.setVersion(twitter.getString("version"));
TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
@@ -30,11 +37,16 @@ public class TwitterStreamConfigurator {
twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
} catch( ConfigException ce ) {}
try {
- twitterStreamConfiguration.setFollow(twitter.getLongList("follow"));
+ List<Long> follows = Lists.newArrayList();
+ for( Integer id : twitter.getIntList("follow"))
+ follows.add(new Long(id));
+ twitterStreamConfiguration.setFollow(follows);
} catch( ConfigException ce ) {}
twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+ twitterStreamConfiguration.setJsonStoreEnabled("true");
+ twitterStreamConfiguration.setIncludeEntities("true");
return twitterStreamConfiguration;
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Mon Feb 10 17:58:17 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterStreamProvider /*extends BaseRichSpout*/ implements StreamsProvider, Serializable {
+public class TwitterStreamProvider implements StreamsProvider, Serializable, Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
@@ -51,7 +51,7 @@ public class TwitterStreamProvider /*ext
protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
- protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
protected StreamingEndpoint endpoint;
protected BasicClient client;
@@ -159,44 +159,19 @@ public class TwitterStreamProvider /*ext
public StreamsResultSet readRange(DateTime start, DateTime end) {
return null;
}
-//
-// @Override
-// public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-// outputFieldsDeclarer.declare(new Fields("document"));
-// outputFieldsDeclarer.declare(new Fields("type"));
-// }
-//
-// @Override
-// public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-// collector = spoutOutputCollector;
-// run();
-// }
-//
-// @Override
-// public void nextTuple() {
-// try {
-// collector.emit( new Values(outQueue.take(), klass) );
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-//
-// }
- public class TwitterStreamCloser implements Runnable {
-
- BlockingQueue<String> queue;
+ @Override
+ public void run() {
- public TwitterStreamCloser(BlockingQueue<String> queue) {
- this.queue = queue;
- }
+ start();
- public void run() {
- for (int i = 0; i < 10; i++) {
- queue.add(TwitterEventProcessor.TERMINATE);
- }
+ while( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) { }
}
+ stop();
}
-
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -13,7 +13,6 @@ import org.apache.streams.pojo.json.Acti
import org.apache.streams.pojo.json.Generator;
import org.apache.streams.pojo.json.Icon;
import org.apache.streams.pojo.json.Provider;
-import org.joda.time.DateTime;
import java.io.IOException;
import java.text.DateFormat;
@@ -32,7 +31,7 @@ import static org.apache.streams.data.ut
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
@@ -76,7 +75,7 @@ public abstract class TwitterJsonEventAc
}
@Override
- public List<Activity> deserializeAll(String serializedList) {
+ public List<Activity> deserializeAll(List<String> serializedList) {
throw new NotImplementedException("Not currently implemented");
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -22,7 +22,7 @@ import static org.apache.streams.data.ut
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
public Activity convert(ObjectNode event) {
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -21,7 +21,7 @@ import static org.apache.streams.data.ut
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer {
+public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
public Activity convert(ObjectNode event) {
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf Mon Feb 10 17:58:17 2014
@@ -8,4 +8,5 @@ twitter {
oauth {
appName = "Apache Streams"
}
+
}
\ No newline at end of file
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Mon Feb 10 17:58:17 2014
@@ -18,7 +18,7 @@
package org.apache.streams.core;
-import java.io.Serializable;
+import java.util.List;
import java.util.Queue;
/**
@@ -35,6 +35,6 @@ public interface StreamsProcessor {
public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
public Queue<StreamsDatum> getProcessorOutputQueue();
- public StreamsDatum process( StreamsDatum entry );
+ public List<StreamsDatum> process( StreamsDatum entry );
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Mon Feb 10 17:58:17 2014
@@ -20,7 +20,6 @@ package org.apache.streams.core;
import org.joda.time.DateTime;
-import java.io.Serializable;
import java.math.BigInteger;
import java.util.Queue;
@@ -38,5 +37,4 @@ public interface StreamsProvider {
public StreamsResultSet readNew(BigInteger sequence);
public StreamsResultSet readRange(DateTime start, DateTime end);
-
}
Modified: incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java?rev=1566690&r1=1566689&r2=1566690&view=diff
==============================================================================
--- incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java Mon Feb 10 17:58:17 2014
@@ -23,9 +23,9 @@ import org.apache.streams.pojo.json.Acti
import java.util.List;
/**
- * Serializes and deserializes Activities from a String
+ * Serializes and deserializes Activities
*/
-public interface ActivitySerializer {
+public interface ActivitySerializer<T> {
/**
* Gets the supported content type that can be deserialized/serialized
@@ -35,24 +35,24 @@ public interface ActivitySerializer {
String serializationFormat();
/**
- * Converts the activity to a String representation.
+ * Converts the activity to a POJO representation.
*
* @param deserialized the string
* @return a fully populated Activity object
*/
- String serialize(Activity deserialized);
+ T serialize(Activity deserialized);
/**
- * Converts a string into an Activity
+ * Converts a POJO into an Activity
* @param serialized the string representation
* @return a fully populated Activity object
*/
- Activity deserialize(String serialized);
+ Activity deserialize(T serialized);
/**
- * Converts a string representing multiple activities into a list of Activity objects
- * @param serializedList a String representation of a List
+ * Converts multiple documents into a list of Activity objects
+ * @param serializedList a typed List of documents
* @return a list of fully populated activities
*/
- List<Activity> deserializeAll(String serializedList);
+ List<Activity> deserializeAll(List<T> serializedList);
}