You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 00:29:19 UTC
[2/2] incubator-streams git commit: use StreamsJacksonMapper with
channel-specific date support deprecate StreamsTwitterMapper deprecate
StreamsDatasiftMapper use generic TypeConverterProcessor deprecate/delete now
unnecessary classes refactor event clas
use StreamsJacksonMapper with channel-specific date support
deprecate StreamsTwitterMapper
deprecate StreamsDatasiftMapper
use generic TypeConverterProcessor
deprecate/delete now unnecessary classes
refactor event classification
add event classification tests
refactor channel serialization
update tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f6a6574
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f6a6574
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f6a6574
Branch: refs/heads/STREAMS-212
Commit: 2f6a6574e1a3d87fd6ae1457f35ce663fc914173
Parents: 9aebd0b
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 15:27:52 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 15:27:52 2014 -0800
----------------------------------------------------------------------
.../DatasiftTypeConverterProcessor.java | 7 +-
.../serializer/DatasiftActivitySerializer.java | 12 +-
.../DatasiftDefaultActivitySerializer.java | 214 -------------------
.../DatasiftInstagramActivitySerializer.java | 8 +-
.../DatasiftTweetActivitySerializer.java | 8 +-
.../datasift/util/StreamsDatasiftMapper.java | 11 +-
.../com/datasift/test/DatasiftSerDeTest.java | 18 +-
.../DatasiftTypeConverterProcessorTest.java | 72 -------
.../DatasiftActivitySerializerTest.java | 6 +-
.../serializer/DatasiftEventClassifierTest.java | 76 +++++++
.../streams-provider-twitter/pom.xml | 6 +
.../processor/TwitterEventProcessor.java | 194 -----------------
.../processor/TwitterProfileProcessor.java | 133 ------------
.../twitter/processor/TwitterTypeConverter.java | 209 ------------------
.../provider/TwitterEventClassifier.java | 58 +++--
.../serializer/StreamsTwitterMapper.java | 11 +-
.../TwitterJsonActivitySerializer.java | 24 +--
.../TwitterJsonDeleteActivitySerializer.java | 6 +
.../TwitterJsonRetweetActivitySerializer.java | 6 +
.../TwitterJsonTweetActivitySerializer.java | 6 +
.../TwitterJsonUserActivitySerializer.java | 6 +
...erJsonUserstreameventActivitySerializer.java | 6 +
.../streams/twitter/test/SimpleTweetTest.java | 11 +-
.../twitter/test/TweetActivitySerDeTest.java | 6 +-
.../streams/twitter/test/TweetSerDeTest.java | 6 +-
.../test/TwitterEventClassifierTest.java | 34 +++
26 files changed, 261 insertions(+), 893 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
index a00cf23..1166b2e 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -57,7 +57,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
List<StreamsDatum> result = Lists.newLinkedList();
Object doc;
try {
- doc = this.converter.convert(entry.getDocument(), this.mapper);
+ if( entry.getDocument() instanceof String ) {
+ ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class);
+ doc = this.converter.convert(node, this.mapper);
+ } else {
+ doc = this.converter.convert(entry.getDocument(), this.mapper);
+ }
if(doc != null) {
result.add(new StreamsDatum(doc, entry.getId()));
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 0ada979..b587cd6 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -32,9 +32,6 @@ import java.util.List;
*/
public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> {
- private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
- private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
- private static final DatasiftInstagramActivitySerializer INSTAGRAM_SERIALIZER = new DatasiftInstagramActivitySerializer();
private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
@Override
@@ -49,13 +46,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
@Override
public Activity deserialize(Datasift serialized) throws ActivitySerializerException {
- if(serialized.getTwitter() != null) {
- return TWITTER_SERIALIZER.deserialize(serialized);
- } else if(serialized.getInstagram() != null) {
- return INSTAGRAM_SERIALIZER.deserialize(serialized);
- } else {
- return DEFAULT_SERIALIZER.deserialize(serialized);
- }
+ ActivitySerializer serializer = DatasiftEventClassifier.bestSerializer(serialized);
+ return serializer.deserialize(serialized);
}
public Activity deserialize(String json) throws ActivitySerializerException {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
deleted file mode 100644
index 678f67b..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.links.Links;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class);
-
- public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
- ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
-
- @Override
- public String serializationFormat() {
- return "application/json+datasift.com.v1.1";
- }
-
- @Override
- public Datasift serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
- }
-
- public Activity deserialize(String datasiftJson) {
- try {
- return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
- } catch (Exception e) {
- LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
- LOGGER.error("Exception : {}", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Activity deserialize(Datasift serialized) {
-
- try {
-
- Activity activity = convert(serialized);
-
- return activity;
-
- } catch (Exception e) {
- throw new IllegalArgumentException("Unable to deserialize", e);
- }
-
- }
-
- @Override
- public List<Activity> deserializeAll(List<Datasift> datasifts) {
- List<Activity> activities = Lists.newArrayList();
- for( Datasift datasift : datasifts ) {
- activities.add(deserialize(datasift));
- }
- return activities;
- }
-
- public static Generator buildGenerator(Interaction interaction) {
- Generator generator = new Generator();
- generator.setDisplayName(interaction.getSource());
- generator.setId(interaction.getSource());
- return generator;
- }
-
- public static Icon getIcon(Interaction interaction) {
- return null;
- }
-
- public static Provider buildProvider(Interaction interaction) {
- Provider provider = new Provider();
- provider.setId("id:providers:"+interaction.getType());
- provider.setDisplayName(interaction.getType());
- return provider;
- }
-
- public static String getUrls(Interaction interaction) {
- return null;
- }
-
- public static void addDatasiftExtension(Activity activity, Datasift datasift) {
- Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
- extensions.put("datasift", datasift);
- }
-
- public static String formatId(String... idparts) {
- return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
- }
-
- public Activity convert(Datasift event) {
-
- Activity activity = new Activity();
- activity.setActor(buildActor(event.getInteraction()));
- activity.setVerb(selectVerb(event));
- activity.setObject(buildActivityObject(event.getInteraction()));
- activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
- activity.setTarget(buildTarget(event.getInteraction()));
- activity.setPublished(event.getInteraction().getCreatedAt());
- activity.setGenerator(buildGenerator(event.getInteraction()));
- activity.setIcon(getIcon(event.getInteraction()));
- activity.setProvider(buildProvider(event.getInteraction()));
- activity.setTitle(event.getInteraction().getTitle());
- activity.setContent(event.getInteraction().getContent());
- activity.setUrl(event.getInteraction().getLink());
- activity.setLinks(getLinks(event));
- addDatasiftExtension(activity, event);
- if( event.getInteraction().getGeo() != null) {
- addLocationExtension(activity, event.getInteraction());
- }
- return activity;
- }
-
- private String selectVerb(Datasift event) {
- return "post";
- }
-
- public Actor buildActor(Interaction interaction) {
- Actor actor = new Actor();
- org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
- if(author == null) {
- LOGGER.warn("Interaction does not contain author information.");
- return actor;
- }
- String userName = author.getUsername();
- String name = author.getName();
- Long id = author.getId();
- if(userName != null) {
- actor.setDisplayName(userName);
- } else {
- actor.setDisplayName(name);
- }
-
- if(id != null) {
- actor.setId(id.toString());
- } else {
- if(userName != null)
- actor.setId(userName);
- else
- actor.setId(name);
- }
- Image image = new Image();
- image.setUrl(interaction.getAuthor().getAvatar());
- actor.setImage(image);
- if (interaction.getAuthor().getLink()!=null){
- actor.setUrl(interaction.getAuthor().getLink());
- }
- return actor;
- }
-
- public static ActivityObject buildActivityObject(Interaction interaction) {
- ActivityObject actObj = new ActivityObject();
- actObj.setObjectType(interaction.getContenttype());
- actObj.setUrl(interaction.getLink());
- actObj.setId(formatId("post",interaction.getId()));
- actObj.setContent(interaction.getContent());
-
- return actObj;
- }
-
- public static List<String> getLinks(Datasift event) {
- List<String> result = Lists.newArrayList();
- Links links = event.getLinks();
- if(links == null)
- return null;
- for(Object link : links.getNormalizedUrl()) {
- if(link != null) {
- if(link instanceof String) {
- result.add((String) link);
- } else {
- LOGGER.warn("link is not of type String : {}", link.getClass().getName());
- }
- }
- }
- return result;
- }
-
- public static ActivityObject buildTarget(Interaction interaction) {
- return null;
- }
-
- public static void addLocationExtension(Activity activity, Interaction interaction) {
- Map<String, Object> extensions = ensureExtensions(activity);
- Map<String, Object> location = new HashMap<String, Object>();
- Map<String, Double> coordinates = new HashMap<String, Double>();
- coordinates.put("latitude", interaction.getGeo().getLatitude());
- coordinates.put("longitude", interaction.getGeo().getLongitude());
- location.put("coordinates", coordinates);
- extensions.put("location", location);
- }
-
- public static String firstStringIfNotNull(List<Object> list) {
- if( list != null && list.size() > 0) {
- return (String) list.get(0);
- } else return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
index cb44df2..d121d65 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
@@ -39,10 +39,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
/**
*
*/
-public class DatasiftInstagramActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer {
private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class);
+ private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer();
+
+ public static DatasiftInstagramActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public Activity convert(Datasift event) {
Activity activity = super.convert(event);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3c7abda..6fd19e7 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -46,10 +46,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
/**
*
*/
-public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftTweetActivitySerializer extends DatasiftInteractionActivitySerializer {
private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTweetActivitySerializer.class);
+ private static DatasiftTweetActivitySerializer instance = new DatasiftTweetActivitySerializer();
+
+ public static DatasiftTweetActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public Activity convert(Datasift event) {
Activity activity = new Activity();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
index c5f2abf..93ab28b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -33,16 +33,21 @@ import java.io.IOException;
/**
* Created by sblackmon on 3/27/14.
+ *
+ * Depracated: Use StreamsJacksonMapper instead
*/
+@Deprecated()
public class StreamsDatasiftMapper extends StreamsJacksonMapper {
- public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+ public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+
+ public static final DateTimeFormatter DATASIFT_FORMATTER = DateTimeFormat.forPattern(DATASIFT_FORMAT);
public static final Long getMillis(String dateTime) {
// this function is for pig which doesn't handle exceptions well
try {
- Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+ Long result = DATASIFT_FORMATTER.parseMillis(dateTime);
return result;
} catch( Exception e ) {
return null;
@@ -66,7 +71,7 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper {
public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
DateTime result = null;
try {
- result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+ result = DATASIFT_FORMATTER.parseDateTime(jpar.getValueAsString());
} catch (Exception e) {}
if (result == null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
index c4422db..750915e 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
@@ -20,7 +20,9 @@ package com.datasift.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.junit.Assert;
import org.junit.Ignore;
@@ -31,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
/**
*
@@ -39,18 +43,11 @@ public class DatasiftSerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
-
-
-
- @Test @Ignore
+ @Test
public void Tests()
{
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json");
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
@@ -60,6 +57,7 @@ public class DatasiftSerDeTest {
String line = br.readLine();
LOGGER.debug(line);
System.out.println(line);
+
Datasift ser = mapper.readValue(line, Datasift.class);
String de = mapper.writeValueAsString(ser);
@@ -68,7 +66,7 @@ public class DatasiftSerDeTest {
Datasift serde = mapper.readValue(de, Datasift.class);
-// Assert.assertEquals(ser, serde);
+ Assert.assertEquals(ser, serde);
LOGGER.debug(mapper.writeValueAsString(serde));
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
deleted file mode 100644
index 015f4e9..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Test;
-
-import java.util.List;
-
-import static junit.framework.Assert.*;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessorTest {
-
- private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
"confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu
...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
-
- @Test
- public void testTypeConverterToString() {
- final String ID = "1";
- StreamsProcessor processor = new DatasiftTypeConverterProcessor(String.class);
- processor.prepare(null);
- StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
- List<StreamsDatum> result = processor.process(datum);
- assertNotNull(result);
- assertEquals(1, result.size());
- StreamsDatum resultDatum = result.get(0);
- assertNotNull(resultDatum);
- assertNotNull(resultDatum.getDocument());
- assertTrue(resultDatum.getDocument() instanceof String);
- assertEquals(ID, resultDatum.getId());
- }
-
- @Test
- public void testTypeConverterToActivity() {
- final String ID = "1";
- StreamsProcessor processor = new DatasiftTypeConverterProcessor(Activity.class);
- processor.prepare(null);
- StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
- List<StreamsDatum> result = processor.process(datum);
- assertNotNull(result);
- assertEquals(1, result.size());
- StreamsDatum resultDatum = result.get(0);
- assertNotNull(resultDatum);
- assertNotNull(resultDatum.getDocument());
- assertTrue(resultDatum.getDocument() instanceof Activity);
- assertEquals(ID, resultDatum.getId());
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 5f9feed..162526b 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,8 +2,10 @@ package org.apache.streams.datasift.serializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Actor;
import org.junit.Test;
@@ -16,7 +18,8 @@ import static org.junit.Assert.assertNotNull;
public class DatasiftActivitySerializerTest {
private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
- private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
@Test
public void testGeneralConversion() throws Exception {
@@ -42,6 +45,7 @@ public class DatasiftActivitySerializerTest {
testGeneralConversion(line);
testDeserNoNull(line);
testDeserNoAddProps(line);
+
System.out.println("ORIGINAL -> "+line);
System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
System.out.println("NODE -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
new file mode 100644
index 0000000..2004654
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifierTest {
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+
+ @Test
+ public void testTwitterDetection() throws Exception {
+ Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+ String line = null;
+ while(scanner.hasNextLine()) {
+ line = scanner.nextLine();
+ Datasift datasift = MAPPER.readValue(line, Datasift.class);
+ assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
+ assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer);
+ }
+ }
+
+ @Test
+ public void testInstagramDetection() throws Exception {
+ Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+ String line = null;
+ while(scanner.hasNextLine()) {
+ line = scanner.nextLine();
+ Datasift datasift = MAPPER.readValue(line, Datasift.class);
+ assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class);
+ assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index edf4959..9c99a92 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -50,6 +50,12 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-processor-jackson</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
deleted file mode 100644
index fb4615f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements StreamsProcessor {
-
- private final static String STREAMS_ID = "TwitterEventProcessor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
- private ObjectMapper mapper = new StreamsTwitterMapper();
-
- private Class inClass;
- private Class outClass;
-
- private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
- public TwitterEventProcessor(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public TwitterEventProcessor( Class outClass) {
- this(null, outClass);
- }
-
- public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
- Object result = null;
-
- Preconditions.checkNotNull(event);
- Preconditions.checkNotNull(mapper);
- Preconditions.checkNotNull(twitterJsonActivitySerializer);
-
- if( outClass.equals( Activity.class )) {
- LOGGER.debug("ACTIVITY");
- result = twitterJsonActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else if( outClass.equals( Tweet.class )) {
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- result = mapper.convertValue(event, Tweet.class);
- }
- } else if( outClass.equals( Retweet.class )) {
- if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- result = mapper.convertValue(event, Retweet.class);
- }
- } else if( outClass.equals( Delete.class )) {
- if ( inClass.equals( Delete.class )) {
- LOGGER.debug("DELETE");
- result = mapper.convertValue(event, Delete.class);
- }
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null )
- return result;
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- // first check for valid json
- ObjectNode node = (ObjectNode) entry.getDocument();
-
- LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
-
- String json = null;
- try {
- json = mapper.writeValueAsString(node);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- if( StringUtils.isNotEmpty(json)) {
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(json);
-
- // if the target is string, just pass-through
- if (java.lang.String.class.equals(outClass))
- return Lists.newArrayList(new StreamsDatum(json));
- else {
- // convert to desired format
- Object out = null;
- try {
- out = convert(node, inClass, outClass);
- } catch (ActivitySerializerException e) {
- LOGGER.warn("Failed deserializing", e);
- return Lists.newArrayList();
- } catch (JsonProcessingException e) {
- LOGGER.warn("Failed parsing JSON", e);
- return Lists.newArrayList();
- }
-
- if (out != null && validate(out, outClass))
- return Lists.newArrayList(new StreamsDatum(out));
- }
- }
-
- return Lists.newArrayList();
-
- }
-
- @Override
- public void prepare(Object configurationObject) {
- mapper = new StreamsJacksonMapper();
- twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
deleted file mode 100644
index 674eef1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
- private ObjectMapper mapper = new StreamsTwitterMapper();
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- @Override
- public void run() {
-
- while(true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
- if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
- LOGGER.info("Terminating!");
- break;
- }
-
- Thread.sleep(new Random().nextInt(100));
-
- for( StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
-
-
- } catch (Exception e) {
- e.printStackTrace();
-
- }
- }
- }
-
- public StreamsDatum createStreamsDatum(User user) {
- return new StreamsDatum(user, user.getIdStr());
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
- String item;
- try {
- // first check for valid json
- // since data is coming from outside provider, we don't know what type the events are
- if( entry.getDocument() instanceof String) {
- item = (String) entry.getDocument();
- } else {
- item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
- }
-
- Class inClass = TwitterEventClassifier.detectClass(item);
-
- User user;
-
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- Tweet tweet = mapper.readValue(item, Tweet.class);
- user = tweet.getUser();
- result.add(createStreamsDatum(user));
- }
- else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- Retweet retweet = mapper.readValue(item, Retweet.class);
- user = retweet.getRetweetedStatus().getUser();
- result.add(createStreamsDatum(user));
- } else if ( inClass.equals( User.class )) {
- LOGGER.debug("USER");
- user = mapper.readValue(item, User.class);
- result.add(createStreamsDatum(user));
- } else {
- return Lists.newArrayList();
- }
-
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn("Error processing " + entry.toString());
- return Lists.newArrayList();
- }
- }
-
- @Override
- public void prepare(Object o) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
deleted file mode 100644
index 74cce27..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTypeConverter implements StreamsProcessor {
-
- public final static String STREAMS_ID = "TwitterTypeConverter";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
-
- private ObjectMapper mapper;
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private Class inClass;
- private Class outClass;
-
- private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
- private int count = 0;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public TwitterTypeConverter(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
- inQueue = inputQueue;
- }
-
- public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- LOGGER.debug("ACTIVITY");
- result = twitterJsonActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else if( outClass.equals( Tweet.class )) {
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- result = mapper.convertValue(event, Tweet.class);
- }
- } else if( outClass.equals( Retweet.class )) {
- if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- result = mapper.convertValue(event, Retweet.class);
- }
- } else if( outClass.equals( Delete.class )) {
- if ( inClass.equals( Delete.class )) {
- LOGGER.debug("DELETE");
- result = mapper.convertValue(event, Delete.class);
- }
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null ) {
- count ++;
- return result;
- }
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- StreamsDatum result = null;
-
- try {
-
- Object item = entry.getDocument();
- ObjectNode node;
-
- LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
- if( item instanceof String ) {
-
- // if the target is string, just pass-through
- if( String.class.equals(outClass)) {
- result = entry;
- }
- else {
- // first check for valid json
- node = (ObjectNode)mapper.readTree((String)item);
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass((String) item);
-
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
- }
-
- } else if( item instanceof ObjectNode ) {
-
- // first check for valid json
- node = (ObjectNode)mapper.valueToTree(item);
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
-
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
-
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if( result != null )
- return Lists.newArrayList(result);
- else
- return Lists.newArrayList();
- }
-
- @Override
- public void prepare(Object o) {
- mapper = new StreamsTwitterMapper();
- twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
- }
-
- @Override
- public void cleanUp() {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index 432a047..2234739 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -18,41 +18,38 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.pojo.*;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer;
import java.io.IOException;
+import java.io.Serializable;
/**
* Created by sblackmon on 12/13/13.
*/
-public class TwitterEventClassifier {
+public class TwitterEventClassifier implements Serializable {
- public static Class detectClass( String json ) {
+ private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+ public static Class detectClass( String json ) {
Preconditions.checkNotNull(json);
Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-// try {
-// JsonAssert.with(json).assertNull("$.delete");
-// } catch( AssertionError ae ) {
-// return Delete.class;
-// }
-//
-// try {
-// JsonAssert.with(json).assertNull("$.retweeted_status");
-// } catch( AssertionError ae ) {
-// return Retweet.class;
-// }
-//
-// return Tweet.class;
-
ObjectNode objectNode;
try {
- objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json);
+ objectNode = (ObjectNode) mapper.readTree(json);
} catch (IOException e) {
e.printStackTrace();
return null;
@@ -72,4 +69,31 @@ public class TwitterEventClassifier {
else
return Tweet.class;
}
+ public static ActivitySerializer bestSerializer( String json ) {
+
+ Preconditions.checkNotNull(json);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+ ObjectNode objectNode;
+ try {
+ objectNode = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
+ return TwitterJsonRetweetActivitySerializer.getInstance();
+ else if( objectNode.findValue("delete") != null )
+ return TwitterJsonDeleteActivitySerializer.getInstance();
+// else if( objectNode.findValue("friends") != null ||
+// objectNode.findValue("friends_str") != null )
+// return FriendList.class;
+ else if( objectNode.findValue("target_object") != null )
+ return TwitterJsonUserstreameventActivitySerializer.getInstance();
+ else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
+ return TwitterJsonUserActivitySerializer.getInstance();
+ else
+ return TwitterJsonTweetActivitySerializer.getInstance();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 2ed16ba..395bd95 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -40,16 +40,21 @@ import java.io.IOException;
/**
* Created by sblackmon on 3/27/14.
+ *
+ * Deprecated: Use StreamsJacksonMapper
*/
+@Deprecated
public class StreamsTwitterMapper extends StreamsJacksonMapper {
- public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+ public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+
+ public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT);
public static final Long getMillis(String dateTime) {
// this function is for pig which doesn't handle exceptions well
try {
- Long result = TWITTER_FORMAT.parseMillis(dateTime);
+ Long result = TWITTER_FORMATTER.parseMillis(dateTime);
return result;
} catch( Exception e ) {
return null;
@@ -73,7 +78,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
DateTime result = null;
try {
- result = TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+ result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString());
} catch( Exception e ) { }
try {
result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index 5b5be96..d1f0de9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -35,10 +35,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
}
- TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
- TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer();
+ private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer();
+
+ public static TwitterJsonActivitySerializer getInstance() {
+ return instance;
+ }
@Override
public String serializationFormat() {
@@ -53,18 +54,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
@Override
public Activity deserialize(String serialized) throws ActivitySerializerException {
- Class documentSubType = TwitterEventClassifier.detectClass(serialized);
+ ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized);
+ Activity activity = serializer.deserialize(serialized);
- Activity activity;
- if( documentSubType == Tweet.class )
- activity = tweetActivitySerializer.deserialize(serialized);
- else if( documentSubType == Retweet.class )
- activity = retweetActivitySerializer.deserialize(serialized);
- else if( documentSubType == Delete.class )
- activity = deleteActivitySerializer.deserialize(serialized);
- else if( documentSubType == User.class )
- activity = userActivitySerializer.deserialize(serialized);
- else throw new ActivitySerializerException("unrecognized type");
+ if( activity == null )
+ throw new ActivitySerializerException("unrecognized type");
return activity;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index cb1618a..b368f71 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -46,6 +46,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
*/
public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
+ private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer();
+
+ public static TwitterJsonDeleteActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public String serializationFormat() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 4f141bb..58cb769 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -37,6 +37,12 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
}
+ private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer();
+
+ public static TwitterJsonRetweetActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public String serializationFormat() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 8e58326..e6fc05f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -34,6 +34,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
+ private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer();
+
+ public static TwitterJsonTweetActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public String serializationFormat() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
index 2ae5355..1bf935c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
@@ -36,6 +36,12 @@ public class TwitterJsonUserActivitySerializer implements ActivitySerializer<Str
public TwitterJsonUserActivitySerializer() {}
+ private static TwitterJsonUserActivitySerializer instance = new TwitterJsonUserActivitySerializer();
+
+ public static TwitterJsonUserActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public String serializationFormat() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
index edaec01..e2832dd 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -44,6 +44,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
*/
public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> {
+ private static TwitterJsonUserstreameventActivitySerializer instance = new TwitterJsonUserstreameventActivitySerializer();
+
+ public static TwitterJsonUserstreameventActivitySerializer getInstance() {
+ return instance;
+ }
+
@Override
public String serializationFormat() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 2d18db9..6b62fe3 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -21,13 +21,15 @@ package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.TypeConverterProcessor;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
import org.junit.Assert;
@@ -55,14 +57,13 @@ import static org.junit.Assert.assertThat;
public class SimpleTweetTest {
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class);
- private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
-
- // @Ignore
@Test
public void Tests()
{
@@ -100,7 +101,7 @@ public class SimpleTweetTest {
}
try {
- TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+ TypeConverterProcessor converter = new TypeConverterProcessor(String.class, Activity.class);
converter.prepare(null);
converter.process(new StreamsDatum(TWITTER_JSON));
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
index c7f6434..d6af4d9 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -21,7 +21,9 @@ package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
@@ -52,11 +54,11 @@ import static org.junit.Assert.assertThat;
public class TweetActivitySerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
- private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
- // @Ignore
@Test
public void Tests()
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index d0a6714..eba5fd0 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
@@ -54,11 +56,11 @@ import static org.junit.Assert.assertThat;
public class TweetSerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
- private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
- // @Ignore
@Test
public void Tests()
{