You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:19:47 UTC
[09/71] [abbrv] fixing STREAMS-26 branch
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/README.markdown
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/README.markdown b/trunk/streams-contrib/streams-provider-twitter/README.markdown
deleted file mode 100644
index 184912e..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/README.markdown
+++ /dev/null
@@ -1,48 +0,0 @@
-streams-provider-twitter
-
-Purpose
-
- Module connects to the twitter streaming API, collects events, and passes each message downstream.
-
-Options
-
- Sample - supported, tested
- Firehose - supported, not tested
- Site - not currently supported
-
-Capabilities
-
- Validation
-
- Optionally, module will validate each message
-
- Simplification
-
- Optionally, module can output messages as basic text
-
- Normalization
-
- Optionally, module can output messages as other json objects such as Activity
-
- Deletion
-
- By default, module will submit delete the object from each directly connected persist step (not implemented)
-
-Run-modes
-
- Standalone
-
- Runs in a java process.
- Writes to standard out.
-
- Placeholder for how
- Configure via property file
- Configure via command line
-
- Storm
-
- Runs as a spout.
-
- Placeholder for how
- Configure via property file
- Configure via command line
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/pom.xml b/trunk/streams-contrib/streams-provider-twitter/pom.xml
deleted file mode 100644
index 9a12bbc..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/pom.xml
+++ /dev/null
@@ -1,150 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-contrib</artifactId>
- <version>0.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>streams-provider-twitter</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-pojo</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-config</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.jayway.jsonpath</groupId>
- <artifactId>json-path</artifactId>
- </dependency>
- <dependency>
- <groupId>com.jayway.jsonpath</groupId>
- <artifactId>json-path-assert</artifactId>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>hbc-core</artifactId>
- <version>1.4.2</version>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-core</artifactId>
- <version>3.0.5</version>
- </dependency>
- <!--<dependency>-->
- <!--<groupId>com.twitter</groupId>-->
- <!--<artifactId>finagle-core_2.10</artifactId>-->
- <!--<version>6.9.0</version>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>com.twitter</groupId>-->
- <!--<artifactId>finagle-http_2.10</artifactId>-->
- <!--<version>6.9.0</version>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>com.twitter</groupId>-->
- <!--<artifactId>finagle-stream_2.10</artifactId>-->
- <!--<version>6.9.0</version>-->
- <!--</dependency>-->
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/jsonschema2pojo</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-maven-plugin</artifactId>
- <configuration>
- <addCompileSourceRoot>true</addCompileSourceRoot>
- <generateBuilders>true</generateBuilders>
- <sourcePaths>
- <sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath>
- <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
- <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
- <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>
- </sourcePaths>
- <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
- <targetPackage>org.apache.streams.twitter.pojo</targetPackage>
- <useLongIntegers>true</useLongIntegers>
- <useJodaDates>false</useJodaDates>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
deleted file mode 100644
index bc5a385..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-/**
- * Created by steveblackmon on 2/8/14.
- */
-public class TwitterErrorHandler
-{
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
-
- protected static final long initial_backoff = 1000;
- protected static long backoff = initial_backoff;
-
- public static int handleTwitterError(Twitter twitter, Exception exception)
- {
- if(exception instanceof TwitterException)
- {
- TwitterException e = (TwitterException)exception;
- if(e.exceededRateLimitation())
- {
- LOGGER.warn("Rate Limit Exceeded");
- try {
- Thread.sleep(backoff *= 2);
- } catch (InterruptedException e1) {}
- return 1;
- }
- else if(e.isCausedByNetworkIssue())
- {
- LOGGER.info("Twitter Network Issues Detected. Backing off...");
- LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage());
- try {
- Thread.sleep(backoff *= 2);
- } catch (InterruptedException e1) {}
- return 1;
- }
- else if(e.isErrorMessageAvailable())
- {
- if(e.getMessage().toLowerCase().contains("does not exist"))
- {
- LOGGER.warn("User does not exist...");
- return 100;
- }
- else
- {
- return 1;
- }
- }
- else
- {
- if(e.getExceptionCode().equals("ced778ef-0c669ac0"))
- {
- // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
- return 5;
- }
- else
- {
- LOGGER.warn("Unknown Twitter Exception...");
- LOGGER.warn(" Account: {}", twitter);
- LOGGER.warn(" Access: {}", e.getAccessLevel());
- LOGGER.warn(" Code: {}", e.getExceptionCode());
- LOGGER.warn(" Message: {}", e.getLocalizedMessage());
- return 1;
- }
- }
- }
- else if(exception instanceof RuntimeException)
- {
- LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
- return 1;
- }
- else
- {
- LOGGER.info("Completely Unknown Exception: {}", exception);
- return 1;
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
deleted file mode 100644
index d31c346..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.jayway.jsonassert.JsonAssert;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-
-/**
- * Created by sblackmon on 12/13/13.
- */
-public class TwitterEventClassifier {
-
- public static Class detectClass( String json ) {
-
- try {
- JsonAssert.with(json).assertNull("$.delete");
- } catch( AssertionError ae ) {
- return Delete.class;
- }
-
- try {
- JsonAssert.with(json).assertNull("$.retweeted_status");
- } catch( AssertionError ae ) {
- return Retweet.class;
- }
-
- return Tweet.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
deleted file mode 100644
index 934d533..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private BlockingQueue<String> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private Class inClass;
- private Class outClass;
-
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
- this.outClass = outClass;
- }
-
- @Override
- public void run() {
-
- while(true) {
- try {
- String item = inQueue.take();
- Thread.sleep(new Random().nextInt(100));
- if(item==TERMINATE) {
- LOGGER.info("Terminating!");
- return;
- }
-
- // first check for valid json
- ObjectNode node = (ObjectNode)mapper.readTree(item);
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(item);
-
- // if the target is string, just pass-through
- if( java.lang.String.class.equals(outClass))
- outQueue.offer(new StreamsDatum(item));
- else {
- // convert to desired format
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- outQueue.offer(new StreamsDatum(out));
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- public Object convert(ObjectNode event, Class inClass, Class outClass) {
-
- LOGGER.debug(event.toString());
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- if( inClass.equals( Delete.class )) {
- LOGGER.debug("ACTIVITY DELETE");
- result = twitterJsonDeleteActivitySerializer.convert(event);
- } else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("ACTIVITY RETWEET");
- result = twitterJsonRetweetActivitySerializer.convert(event);
- } else if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("ACTIVITY TWEET");
- result = twitterJsonTweetActivitySerializer.convert(event);
- } else {
- return null;
- }
- } else if( outClass.equals( Tweet.class )) {
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- result = mapper.convertValue(event, Tweet.class);
- }
- } else if( outClass.equals( Retweet.class )) {
- if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- result = mapper.convertValue(event, Retweet.class);
- }
- } else if( outClass.equals( Delete.class )) {
- if ( inClass.equals( Delete.class )) {
- LOGGER.debug("DELETE");
- result = mapper.convertValue(event, Delete.class);
- }
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null )
- return result;
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
deleted file mode 100644
index 7bb7048..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterOAuthConfiguration;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterStreamConfigurator {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class);
-
- public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
- Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
-
- TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
- twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
- twitterStreamConfiguration.setHost(twitter.getString("host"));
- twitterStreamConfiguration.setPort(twitter.getLong("port"));
- twitterStreamConfiguration.setVersion(twitter.getString("version"));
- TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
- twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
- twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
- twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
- twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
- twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
-
- try {
- twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
- } catch( ConfigException ce ) {}
- try {
- List<Long> follows = Lists.newArrayList();
- for( Integer id : twitter.getIntList("follow"))
- follows.add(new Long(id));
- twitterStreamConfiguration.setFollow(follows);
- } catch( ConfigException ce ) {}
-
- twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
- twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
- twitterStreamConfiguration.setJsonStoreEnabled("true");
- twitterStreamConfiguration.setIncludeEntities("true");
-
- return twitterStreamConfiguration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
deleted file mode 100644
index 49b54b2..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterStreamProvider implements StreamsProvider, Serializable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
-
- private TwitterStreamConfiguration config;
-
- private Class klass;
-
- public TwitterStreamConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(TwitterStreamConfiguration config) {
- this.config = config;
- }
-
- protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
-
- protected volatile Queue<StreamsDatum> providerQueue;
-
- protected StreamingEndpoint endpoint;
- protected BasicClient client;
-
- protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- public TwitterStreamProvider() {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- }
-
- public TwitterStreamProvider(TwitterStreamConfiguration config) {
- this.config = config;
- }
-
- public TwitterStreamProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- this.klass = klass;
- providerQueue = new LinkedBlockingQueue<StreamsDatum>();
- }
-
- public TwitterStreamProvider(TwitterStreamConfiguration config, Class klass) {
- this.config = config;
- this.klass = klass;
- providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
- }
-
- public void run() {
-
- for (int i = 0; i < 10; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
-
- new Thread(new TwitterStreamProviderTask(this)).start();
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public void prepare(Object o) {
-
- Preconditions.checkNotNull(this.klass);
-
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
- Preconditions.checkNotNull(config.getEndpoint());
- if(config.getEndpoint().endsWith("sample.json") ) {
- endpoint = new StatusesSampleEndpoint();
-
- Optional<List<String>> track = Optional.fromNullable(config.getTrack());
- Optional<List<Long>> follow = Optional.fromNullable(config.getFollow());
-
- if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
- if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
- }
- else if( config.getEndpoint().endsWith("firehose.json"))
- endpoint = new StatusesFirehoseEndpoint();
- else
- return;
-
- Authentication auth = new OAuth1(config.getOauth().getConsumerKey(),
- config.getOauth().getConsumerSecret(),
- config.getOauth().getAccessToken(),
- config.getOauth().getAccessTokenSecret());
-
- client = new ClientBuilder()
- .name("apache/streams/streams-contrib/streams-provider-twitter")
- .hosts(Constants.STREAM_HOST)
- .endpoint(endpoint)
- .authentication(auth)
- .processor(new StringDelimitedProcessor(inQueue))
- .build();
- }
-
- @Override
- public void cleanUp() {
- for (int i = 0; i < 10; i++) {
- inQueue.add(TwitterEventProcessor.TERMINATE);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
deleted file mode 100644
index 7270fe9..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProviderTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.endpoint.StreamingEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterStreamProviderTask implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProviderTask.class);
-
- private TwitterStreamProvider provider;
-
- public TwitterStreamProviderTask(TwitterStreamProvider provider) {
- this.provider = provider;
- }
-
- @Override
- public void run() {
-
- provider.client.connect();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
deleted file mode 100644
index ebf9dbc..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
-import twitter4j.conf.ConfigurationBuilder;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
-
- private TwitterStreamConfiguration config;
-
- private Class klass;
-
- public TwitterStreamConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(TwitterStreamConfiguration config) {
- this.config = config;
- }
-
- protected volatile BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
-
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
- protected Twitter client;
-
- ListenableFuture providerTaskComplete;
-//
-// public BlockingQueue<Object> getInQueue() {
-// return inQueue;
-// }
-
- protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
- protected DateTime start;
- protected DateTime end;
-
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
-
- public TwitterTimelineProvider() {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config) {
- this.config = config;
- }
-
- public TwitterTimelineProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("twitter");
- this.config = TwitterStreamConfigurator.detectConfiguration(config);
- this.klass = klass;
- }
-
- public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
- this.config = config;
- this.klass = klass;
- }
-
- public void run() {
-
- Preconditions.checkNotNull(providerQueue);
-
- Preconditions.checkNotNull(this.klass);
-
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
- Preconditions.checkNotNull(config.getFollow());
-
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
- Iterator<Long> ids = config.getFollow().iterator();
- while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
- }
-
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- throw new NotImplementedException();
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- this.start = start;
- this.end = end;
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
-
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
-
- @Override
- public void prepare(Object o) {
-
- Preconditions.checkNotNull(providerQueue);
-
- Preconditions.checkNotNull(this.klass);
-
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
- Preconditions.checkNotNull(config.getFollow());
-
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
- Iterator<Long> ids = config.getFollow().iterator();
- while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
- }
-
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
- }
-
- @Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
deleted file mode 100644
index fcab6f5..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.json.DataObjectFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTimelineProviderTask implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
-
- private TwitterTimelineProvider provider;
- private Twitter twitter;
- private Long id;
-
- public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
- this.provider = provider;
- this.twitter = twitter;
- this.id = id;
- }
-
- @Override
- public void run() {
-
- Paging paging = new Paging(1, 200);
- List<Status> statuses = null;
- boolean KeepGoing = true;
- boolean hadFailure = false;
-
- do
- {
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1)
- {
-
- try
- {
- statuses = twitter.getUserTimeline(id, paging);
-
- for (Status tStat : statuses)
- {
- if( provider.start != null &&
- provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
- {
- // they hit the last date we wanted to collect
- // we can now exit early
- KeepGoing = false;
- }
- // emit the record
- String json = DataObjectFactory.getRawJSON(tStat);
-
- provider.inQueue.offer(json);
-
- }
-
-
- paging.setPage(paging.getPage() + 1);
-
- keepTrying = 10;
- }
- catch(Exception e)
- {
- hadFailure = true;
- keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e);
- }
- finally
- {
- // Shutdown the twitter to release the resources
- twitter.shutdown();
- }
- }
- }
- while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
-
- LOGGER.info("Provider Finished. Cleaning up...");
-
- twitter.shutdown();
-
- LOGGER.info("Provider Exiting");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
deleted file mode 100644
index f13bf91..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Tweet;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterJsonDeleteActivitySerializer extends TwitterJsonEventActivitySerializer {
-
- public Activity convert(ObjectNode event) {
-
- Delete delete = null;
- try {
- delete = mapper.treeToValue(event, Delete.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
- activity.setActor(buildActor(delete));
- activity.setVerb("delete");
- activity.setObject(buildActivityObject(delete));
- activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
- activity.setProvider(buildProvider(event));
- addTwitterExtension(activity, event);
- return activity;
- }
-
- public Actor buildActor(Delete delete) {
- Actor actor = new Actor();
- actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
- return actor;
- }
-
- public ActivityObject buildActivityObject(Delete delete) {
- ActivityObject actObj = new ActivityObject();
- actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
- actObj.setObjectType("tweet");
- return actObj;
- }
-
- public ActivityObject buildTarget(Tweet tweet) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
deleted file mode 100644
index 83be402..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Generator;
-import org.apache.streams.pojo.json.Icon;
-import org.apache.streams.pojo.json.Provider;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
-
- public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
- ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public String serializationFormat() {
- return "application/json+vnd.twitter.com.v1";
- }
-
- @Override
- public String serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Twitter JSON");
- }
-
- @Override
- public Activity deserialize(String serialized) {
- serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
-// System.out.println(serialized);
-
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
- mapper.setAnnotationIntrospector(introspector);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
- try {
- ObjectNode event = (ObjectNode) mapper.readTree(serialized);
-
- Activity activity = convert(event);
-
- return activity;
-
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to deserialize", e);
- }
-
- }
-
- public abstract Activity convert(ObjectNode event);
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- throw new NotImplementedException("Not currently implemented");
- }
-
- public static Date parse(String str) {
- Date date;
- String dstr;
- DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
- DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- try {
- date = fmt.parse(str);
- dstr = out.format(date);
- return out.parse(dstr);
- } catch (ParseException e) {
- throw new IllegalArgumentException("Invalid date format", e);
- }
- }
-
- public static Generator buildGenerator(ObjectNode event) {
- return null;
- }
-
- public static Icon getIcon(ObjectNode event) {
- return null;
- }
-
- public static Provider buildProvider(ObjectNode event) {
- Provider provider = new Provider();
- provider.setId("id:providers:twitter");
- return provider;
- }
-
- public static String getUrls(ObjectNode event) {
- return null;
- }
-
- public static void addTwitterExtension(Activity activity, ObjectNode event) {
- Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
- extensions.put("twitter", event);
- }
-
- public static String formatId(String... idparts) {
- return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
deleted file mode 100644
index 69860a1..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
-
- public Activity convert(ObjectNode event) {
-
- Retweet retweet = null;
- try {
- retweet = mapper.treeToValue(event, Retweet.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
- activity.setActor(buildActor(retweet));
- activity.setVerb("share");
- activity.setObject(buildActivityObject(retweet.getRetweetedStatus()));
- activity.setId(formatId(activity.getVerb(), retweet.getIdStr()));
- activity.setPublished(parse(retweet.getCreatedAt()));
- activity.setGenerator(buildGenerator(event));
- activity.setIcon(getIcon(event));
- activity.setProvider(buildProvider(event));
- activity.setTitle("");
- activity.setContent(retweet.getRetweetedStatus().getText());
- activity.setUrl(getUrls(event));
- activity.setLinks(getLinks(retweet));
- addTwitterExtension(activity, event);
- addLocationExtension(activity, retweet);
- return activity;
- }
-
- public static Actor buildActor(Tweet tweet) {
- Actor actor = new Actor();
- User user = tweet.getUser();
- actor.setId(formatId(user.getIdStr(), tweet.getIdStr()));
- actor.setDisplayName(user.getScreenName());
- actor.setId(user.getIdStr());
- if (user.getUrl()!=null){
- actor.setUrl(user.getUrl());
- }
- return actor;
- }
-
- public static ActivityObject buildActivityObject(Tweet tweet) {
- ActivityObject actObj = new ActivityObject();
- actObj.setId(formatId(tweet.getIdStr()));
- actObj.setObjectType("tweet");
- return actObj;
- }
-
- public static List<Object> getLinks(Retweet retweet) {
- List<Object> links = Lists.newArrayList();
- for( Url url : retweet.getRetweetedStatus().getEntities().getUrls() ) {
- links.add(url.getExpandedUrl());
- }
- return links;
- }
-
- public static void addLocationExtension(Activity activity, Retweet retweet) {
- Map<String, Object> extensions = ensureExtensions(activity);
- Map<String, Object> location = new HashMap<String, Object>();
- location.put("id", formatId(retweet.getIdStr()));
- location.put("coordinates", retweet.getCoordinates());
- extensions.put("location", location);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
deleted file mode 100644
index 08727d8..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
-
- public Activity convert(ObjectNode event) {
-
- Tweet tweet = null;
- try {
- tweet = mapper.treeToValue(event, Tweet.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
- activity.setActor(buildActor(tweet));
- activity.setVerb("post");
- activity.setObject(buildActivityObject(tweet));
- activity.setId(formatId(activity.getVerb(), tweet.getIdStr()));
- activity.setTarget(buildTarget(tweet));
- activity.setPublished(parse(tweet.getCreatedAt()));
- activity.setGenerator(buildGenerator(event));
- activity.setIcon(getIcon(event));
- activity.setProvider(buildProvider(event));
- activity.setTitle("");
- activity.setContent(tweet.getText());
- activity.setUrl(getUrls(event));
- activity.setLinks(getLinks(tweet));
- addTwitterExtension(activity, event);
- addLocationExtension(activity, tweet);
- return activity;
- }
-
- public static Actor buildActor(Tweet tweet) {
- Actor actor = new Actor();
- User user = tweet.getUser();
- actor.setId(formatId(user.getIdStr(), tweet.getIdStr()));
- actor.setDisplayName(user.getScreenName());
- actor.setId(user.getIdStr());
- if (user.getUrl()!=null){
- actor.setUrl(user.getUrl());
- }
- return actor;
- }
-
- public static ActivityObject buildActivityObject(Tweet tweet) {
- ActivityObject actObj = new ActivityObject();
- actObj.setId(formatId(tweet.getIdStr()));
- actObj.setObjectType("tweet");
- return actObj;
- }
-
- public static List<Object> getLinks(Tweet tweet) {
- List<Object> links = Lists.newArrayList();
- for( Url url : tweet.getEntities().getUrls() ) {
- links.add(url.getExpandedUrl());
- }
- return links;
- }
-
- public static ActivityObject buildTarget(Tweet tweet) {
- return null;
- }
-
- public static void addLocationExtension(Activity activity, Tweet tweet) {
- Map<String, Object> extensions = ensureExtensions(activity);
- Map<String, Object> location = new HashMap<String, Object>();
- location.put("id", formatId(tweet.getIdStr()));
- location.put("coordinates", tweet.getCoordinates());
- extensions.put("location", location);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Delete.json
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Delete.json b/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Delete.json
deleted file mode 100644
index 6b2efb0..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Delete.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "id": "#",
- "javaType" : "org.apache.streams.twitter.pojo.Delete",
- "properties": {
- "delete": {
- "type": "object",
- "javaType" : "org.apache.streams.twitter.pojo.DeleteDetails",
- "properties": {
- "status": {
- "type": "object",
- "properties": {
- "id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "user_id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "user_id_str": {
- "type": "string"
- }
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Retweet.json
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Retweet.json b/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Retweet.json
deleted file mode 100644
index 018a263..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/Retweet.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "id": "#",
- "javaType" : "org.apache.streams.twitter.pojo.Retweet",
- "extends": {"$ref":"tweet.json"},
- "properties": {
- "retweeted_status": {
- "type": "object",
- "required" : false,
- "description" : "Describes the tweet being retweeted.",
- "$ref" : "tweet.json"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
deleted file mode 100644
index 087f8fd..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
+++ /dev/null
@@ -1,79 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "id": "#",
- "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "protocol": {
- "type": "string",
- "description": "The protocol"
- },
- "host": {
- "type": "string",
- "description": "The host"
- },
- "port": {
- "type": "integer",
- "description": "The port"
- },
- "version": {
- "type": "string",
- "description": "The version"
- },
- "endpoint": {
- "type": "string",
- "description": "The endpoint"
- },
- "includeEntities": {
- "type": "string"
- },
- "jsonStoreEnabled": {
- "type": "string"
- },
- "truncated": {
- "type": "boolean"
- },
- "filter-level": {
- "type": "string",
- "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
- },
- "follow": {
- "type": "array",
- "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
- "items": {
- "type": "integer"
- }
- },
- "track": {
- "type": "array",
- "description": "A list of phrases which will be used to determine what Tweets will be delivered on the stream",
- "items": {
- "type": "string"
- }
- },
- "oauth": {
- "type": "object",
- "dynamic": "true",
- "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "appName": {
- "type": "string"
- },
- "consumerKey": {
- "type": "string"
- },
- "consumerSecret": {
- "type": "string"
- },
- "accessToken": {
- "type": "string"
- },
- "accessTokenSecret": {
- "type": "string"
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json b/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
deleted file mode 100644
index dcde108..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
+++ /dev/null
@@ -1,296 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "id": "#",
- "javaType" : "org.apache.streams.twitter.pojo.Tweet",
- "properties": {
- "text": {
- "type": "string"
- },
- "retweeted": {
- "type": "boolean"
- },
- "in_reply_to_screen_name": {
- "type": "string"
- },
- "truncated": {
- "type": "boolean"
- },
- "filter_level": {
- "type": "string"
- },
- "contributors": {
- "type": "array",
- "items": {
- "type": "object",
- "javaType" : "org.apache.streams.twitter.pojo.Contributor",
- "properties": {
- "id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "screen_name": {
- "type": "string"
- }
- }
- }
- },
- "coordinates": {
- "type": "object",
- "javaType" : "org.apache.streams.twitter.pojo.Coordinates",
- "items": {
- "properties": {
- "type": {
- "type": "string"
- },
- "coordinates": {
- "type": "array",
- "items": [
- {
- "type": "integer"
- }
- ]
- }
- }
- }
- },
- "entities": {
- "type": "object",
- "dynamic": "true",
- "javaType" : "org.apache.streams.twitter.pojo.Entities",
- "properties": {
- "user_mentions": {
- "type": "array",
- "items": {
- "type": "object",
- "javaType" : "org.apache.streams.twitter.pojo.UserMentions",
- "properties": {
- "id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "name": {
- "type": "string"
- },
- "indices": {
- "type": "array",
- "items": [{
- "type" : "integer"
- }]
- },
- "screen_name": {
- "type": "string"
- },
- "id_str": {
- "type": "string"
- }
- }
- }
- },
- "hashtags": {
- "type": "array",
- "items": [
- {
- "type": "string"
- }
- ]
- },
- "urls": {
- "type": "array",
- "items": {
- "type": "object",
- "javaType": "org.apache.streams.twitter.Url",
- "properties": {
- "expanded_url": {
- "type": "string"
- },
- "indices": {
- "type": "array",
- "items": [
- {
- "type" : "integer"
- }
- ]
- },
- "display_url": {
- "type": "string"
- },
- "url": {
- "type": "string"
- }
- }
- }
- }
- }
- },
- "in_reply_to_status_id_str": {
- "type": "string"
- },
- "id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "in_reply_to_user_id_str": {
- "type": "string"
- },
- "source": {
- "type": "string"
- },
- "lang": {
- "type": "string"
- },
- "favorited": {
- "type": "boolean"
- },
- "possibly_sensitive": {
- "type": "boolean"
- },
- "in_reply_to_status_id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "created_at": {
- "type": "string"
- },
- "in_reply_to_user_id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "retweet_count": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "user": {
- "id": "user",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.pojo.User",
- "dynamic": "true",
- "properties": {
- "location": {
- "type": "string"
- },
- "default_profile": {
- "type": "boolean"
- },
- "statuses_count": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "profile_background_tile": {
- "type": "boolean"
- },
- "lang": {
- "type": "string"
- },
- "profile_link_color": {
- "type": "string"
- },
- "id": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "protected": {
- "type": "boolean"
- },
- "favourites_count": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "profile_text_color": {
- "type": "string"
- },
- "verified": {
- "type": "boolean"
- },
- "description": {
- "type": "string"
- },
- "contributors_enabled": {
- "type": "boolean"
- },
- "name": {
- "type": "string"
- },
- "profile_sidebar_border_color": {
- "type": "string"
- },
- "profile_background_color": {
- "type": "string"
- },
- "created_at": {
- "type": "string"
- },
- "default_profile_image": {
- "type": "boolean"
- },
- "followers_count": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "geo_enabled": {
- "type": "boolean"
- },
- "profile_image_url_https": {
- "type": "string"
- },
- "profile_background_image_url": {
- "type": "string"
- },
- "profile_background_image_url_https": {
- "type": "string"
- },
- "follow_request_sent": {
- "type": "boolean"
- },
- "url": {
- "type": "string"
- },
- "utc_offset": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "time_zone": {
- "type": "string"
- },
- "profile_use_background_image": {
- "type": "boolean"
- },
- "friends_count": {
- "ignore_malformed": false,
- "type": "integer"
- },
- "profile_sidebar_fill_color": {
- "type": "string"
- },
- "screen_name": {
- "type": "string"
- },
- "id_str": {
- "type": "string"
- },
- "profile_image_url": {
- "type": "string"
- },
- "is_translator": {
- "type": "boolean"
- },
- "listed_count": {
- "ignore_malformed": false,
- "type": "integer"
- }
- }
- },
- "retweeted_status": {
- "type": "object",
- "required" : false,
- "description" : "Describes the tweet being retweeted.",
- "$ref" : "#"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf b/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
deleted file mode 100644
index 49555fc..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/main/resources/reference.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-twitter {
- protocol = "https"
- host = "stream.twitter.com"
- port = 443
- version = "1.1"
- endpoint = "statuses/sample.json"
- filter-level = "none"
- oauth {
- appName = "Apache Streams"
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6ea69f29/trunk/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/trunk/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/trunk/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
deleted file mode 100644
index 0664595..0000000
--- a/trunk/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static java.util.regex.Pattern.matches;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class TweetSerDeTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
- private ObjectMapper mapper = new ObjectMapper();
-
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
- // @Ignore
- @Test
- public void Tests()
- {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
- InputStream is = TweetSerDeTest.class.getResourceAsStream("/twitter_jsons.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- Class detected = TwitterEventClassifier.detectClass(line);
-
- ObjectNode event = (ObjectNode) mapper.readTree(line);
-
- assertThat(event, is(not(nullValue())));
-
- String tweetstring = mapper.writeValueAsString(event);
-
- LOGGER.info("{}: {}", detected.getName(), tweetstring);
-
- Activity activity;
- if( detected.equals( Delete.class )) {
- activity = twitterJsonDeleteActivitySerializer.convert(event);
- } else if ( detected.equals( Retweet.class )) {
- activity = twitterJsonRetweetActivitySerializer.convert(event);
- } else if ( detected.equals( Tweet.class )) {
- activity = twitterJsonTweetActivitySerializer.convert(event);
- } else {
- Assert.fail();
- return;
- }
-
- String activitystring = mapper.writeValueAsString(activity);
-
- LOGGER.info("activity: {}", activitystring);
-
- assertThat(activity, is(not(nullValue())));
- if(activity.getId() != null) {
- assertThat(matches("id:.*:[a-z]*:[a-zA-Z0-9]*", activity.getId()), is(true));
- }
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getObject(), is(not(nullValue())));
- assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
- }
- }
- } catch( Exception e ) {
- System.out.println(e);
- e.printStackTrace();
- Assert.fail();
- }
- }
-}