You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 21:11:49 UTC
[04/28] incubator-streams git commit: omni-bus update
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
new file mode 100644
index 0000000..112b6a5
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import com.moreover.api.ObjectFactory;
+import org.apache.commons.lang.SerializationException;
+import org.apache.streams.data.util.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ */
+public class MoreoverXmlActivityConverter implements ActivityConverter<String> {
+
+ //JAXBContext is threadsafe (supposedly)
+ private final JAXBContext articleContext;
+ private final JAXBContext articlesContext;
+
+ public MoreoverXmlActivityConverter() {
+ articleContext = createContext(Article.class);
+ articlesContext = createContext(ArticlesResponse.class);
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "application/xml+vnd.moreover.com.v1";
+ }
+
+ @Override
+ public String serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+ }
+
+ @Override
+ public Activity deserialize(String serialized) {
+ Article article = deserializeMoreover(serialized);
+ return MoreoverUtils.convert(article);
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ List<Activity> activities = new LinkedList<Activity>();
+ for(String item : serializedList) {
+ ArticlesResponse response = deserializeMoreoverResponse(item);
+ for(Article article : response.getArticles().getArticle()) {
+ activities.add(MoreoverUtils.convert(article));
+ }
+ }
+ return activities;
+ }
+
+ private Article deserializeMoreover(String serialized){
+ try {
+ Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+ return (Article) unmarshaller.unmarshal(new StringReader(serialized));
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to deserialize Moreover data", e);
+ }
+ }
+
+ private ArticlesResponse deserializeMoreoverResponse(String serialized){
+ try {
+ Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+ return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to deserialize Moreover data", e);
+ }
+ }
+
+ private JAXBContext createContext(Class articleClass) {
+ JAXBContext context;
+ try {
+ context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
+ }
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
deleted file mode 100644
index d60bcb8..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import com.moreover.api.ObjectFactory;
-import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
- */
-public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
-
- //JAXBContext is threadsafe (supposedly)
- private final JAXBContext articleContext;
- private final JAXBContext articlesContext;
-
- public MoreoverXmlActivitySerializer() {
- articleContext = createContext(Article.class);
- articlesContext = createContext(ArticlesResponse.class);
- }
-
- @Override
- public String serializationFormat() {
- return "application/xml+vnd.moreover.com.v1";
- }
-
- @Override
- public String serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
- }
-
- @Override
- public Activity deserialize(String serialized) {
- Article article = deserializeMoreover(serialized);
- return MoreoverUtils.convert(article);
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- List<Activity> activities = new LinkedList<Activity>();
- for(String item : serializedList) {
- ArticlesResponse response = deserializeMoreoverResponse(item);
- for(Article article : response.getArticles().getArticle()) {
- activities.add(MoreoverUtils.convert(article));
- }
- }
- return activities;
- }
-
- private Article deserializeMoreover(String serialized){
- try {
- Unmarshaller unmarshaller = articleContext.createUnmarshaller();
- return (Article) unmarshaller.unmarshal(new StringReader(serialized));
- } catch (JAXBException e) {
- throw new SerializationException("Unable to deserialize Moreover data", e);
- }
- }
-
- private ArticlesResponse deserializeMoreoverResponse(String serialized){
- try {
- Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
- return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
- } catch (JAXBException e) {
- throw new SerializationException("Unable to deserialize Moreover data", e);
- }
- }
-
- private JAXBContext createContext(Class articleClass) {
- JAXBContext context;
- try {
- context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
- } catch (JAXBException e) {
- throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
- }
- return context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
new file mode 100644
index 0000000..a9e8e3a
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.util.JsonUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+import static org.junit.Assert.assertThat;
+
+public class MoreoverJsonActivityConverterTest {
+ JsonNode json;
+ ActivityConverter serializer = new MoreoverJsonActivityConverter();
+ ObjectMapper mapper;
+
+ @Before
+ public void setup() throws IOException {
+ json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
+
+ mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ }
+
+ @Test
+ public void loadData() throws Exception {
+ for (JsonNode item : json) {
+ test(serializer.deserialize(getString(item)));
+ }
+ }
+
+
+ private String getString(JsonNode jsonNode) {
+ try {
+ return new ObjectMapper().writeValueAsString(jsonNode);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
deleted file mode 100644
index f5d66b1..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.util.JsonUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Pattern;
-
-import static java.util.regex.Pattern.matches;
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-public class MoreoverJsonActivitySerializerTest {
- JsonNode json;
- ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
- ObjectMapper mapper;
-
- @Before
- public void setup() throws IOException {
- json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
-
- mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- }
-
- @Test
- public void loadData() throws Exception {
- for (JsonNode item : json) {
- test(serializer.deserialize(getString(item)));
- }
- }
-
-
- private String getString(JsonNode jsonNode) {
- try {
- return new ObjectMapper().writeValueAsString(jsonNode);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
new file mode 100644
index 0000000..9834032
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+
+public class MoreoverXmlActivityConverterTest {
+ ActivityConverter serializer;
+ private String xml;
+
+ @Before
+ public void setup() throws IOException {
+ serializer = new MoreoverXmlActivityConverter();
+ xml = loadXml();
+ }
+
+ @Test
+ public void loadData() throws Exception {
+ List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
+ for (Activity activity : activities) {
+ test(activity);
+ }
+ }
+
+ private String loadXml() throws IOException {
+ StringWriter writer = new StringWriter();
+ InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml");
+ IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+ return writer.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
deleted file mode 100644
index dbebee2..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-public class MoreoverXmlActivitySerializerTest {
- ActivitySerializer serializer;
- private String xml;
-
- @Before
- public void setup() throws IOException {
- serializer = new MoreoverXmlActivitySerializer();
- xml = loadXml();
- }
-
- @Test
- public void loadData() throws Exception {
- List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
- for (Activity activity : activities) {
- test(activity);
- }
- }
-
- private String loadXml() throws IOException {
- StringWriter writer = new StringWriter();
- InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml");
- IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
- return writer.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
index 339b922..2c86fa0 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class RssTypeConverter implements StreamsProcessor{
private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class);
- private SyndEntryActivitySerializer serializer;
+ private SyndEntryActivityConverter serializer;
private int successCount = 0;
private int failCount = 0;
@@ -62,7 +62,7 @@ public class RssTypeConverter implements StreamsProcessor{
@Override
public void prepare(Object o) {
- this.serializer = new SyndEntryActivitySerializer();
+ this.serializer = new SyndEntryActivityConverter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
index 75d275d..347f593 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
@@ -19,11 +19,10 @@
package org.apache.streams.rss.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.sun.syndication.feed.synd.SyndEntry;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
import org.apache.streams.rss.serializer.SyndEntrySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class RssEventProcessor implements Runnable {
private Class inClass;
private Class outClass;
- private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer();
+ private SyndEntryActivityConverter syndEntryActivitySerializer = new SyndEntryActivityConverter();
private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer();
public final static String TERMINATE = new String("TERMINATE");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
new file mode 100644
index 0000000..9801c30
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SyndEntryActivityConverter implements ActivityConverter<ObjectNode> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivityConverter.class);
+
+ private boolean includeRomeExtension;
+
+ public SyndEntryActivityConverter() {
+ this(true);
+ }
+
+ public SyndEntryActivityConverter(boolean includeRomeExtension) {
+ this.includeRomeExtension = includeRomeExtension;
+ }
+
+
+ @Override
+ public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
+ List<Activity> result = Lists.newLinkedList();
+ for (ObjectNode node : objectNodes) {
+ result.add(deserialize(node));
+ }
+ return result;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "application/streams-provider-rss";
+ }
+
+ @Override
+ public ObjectNode serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Rome");
+ }
+
+ @Override
+ public Activity deserialize(ObjectNode syndEntry) {
+ return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+ }
+
+ public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
+ Preconditions.checkNotNull(entry);
+
+ Activity activity = new Activity();
+ Provider provider = buildProvider(entry);
+ Actor actor = buildActor(entry);
+ ActivityObject activityObject = buildActivityObject(entry);
+
+ activityObject.setUrl(provider.getUrl());
+ activityObject.setAuthor(actor.getAuthor());
+
+ activity.setUrl(provider.getUrl());
+ activity.setProvider(provider);
+ activity.setActor(actor);
+ activity.setVerb("post");
+ activity.setId("id:rss:post:" + activity.getUrl());
+
+ JsonNode published = entry.get("publishedDate");
+ if (published != null) {
+ try {
+ activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse date : {}", published.textValue());
+
+ DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
+ activity.setPublished(now);
+ }
+ }
+
+ activity.setUpdated(activityObject.getUpdated());
+ activity.setObject(activityObject);
+
+ if (withExtension) {
+ activity = addRomeExtension(activity, entry);
+ }
+
+ return activity;
+ }
+
+ /**
+ * Given an RSS entry, extra out the author and actor information and return it
+ * in an actor object
+ *
+ * @param entry
+ * @return
+ */
+ private Actor buildActor(ObjectNode entry) {
+ Author author = new Author();
+ Actor actor = new Actor();
+
+ if (entry.get("author") != null) {
+ author.setId(entry.get("author").textValue());
+ author.setDisplayName(entry.get("author").textValue());
+
+ actor.setAuthor(author);
+ String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
+
+ actor.setId("id:rss:" + uriToSet + ":" + author.getId());
+ actor.setDisplayName(author.getDisplayName());
+ }
+
+ return actor;
+ }
+
+ /**
+ * Given an RSS object, build the ActivityObject
+ *
+ * @param entry
+ * @return
+ */
+ private ActivityObject buildActivityObject(ObjectNode entry) {
+ ActivityObject activityObject = new ActivityObject();
+
+ JsonNode summary = entry.get("description");
+ if (summary != null)
+ activityObject.setSummary(summary.textValue());
+ else if((summary = entry.get("title")) != null) {
+ activityObject.setSummary(summary.textValue());
+ }
+
+ return activityObject;
+ }
+
+ /**
+ * Given an RSS object, build and return the Provider object
+ *
+ * @param entry
+ * @return
+ */
+ private Provider buildProvider(ObjectNode entry) {
+ Provider provider = new Provider();
+
+ String link = null;
+ String uri = null;
+ String resourceLocation = null;
+
+ if (entry.get("link") != null)
+ link = entry.get("link").textValue();
+ if (entry.get("uri") != null)
+ uri = entry.get("uri").textValue();
+
+ /**
+ * Order of precedence for resourceLocation selection
+ *
+ * 1. Valid URI
+ * 2. Valid Link
+ * 3. Non-null URI
+ * 4. Non-null Link
+ */
+ if(isValidResource(uri))
+ resourceLocation = uri;
+ else if(isValidResource(link))
+ resourceLocation = link;
+ else if(uri != null || link != null) {
+ resourceLocation = (uri != null) ? uri : link;
+ }
+
+ provider.setId("id:providers:rss");
+ provider.setUrl(resourceLocation);
+ provider.setDisplayName("RSS");
+
+ return provider;
+ }
+
+ /**
+ * Tests whether or not the passed in resource is a valid URI
+ * @param resource
+ * @return boolean of whether or not the resource is valid
+ */
+ private boolean isValidResource(String resource) {
+ if(resource != null && resource.startsWith("http") || resource.startsWith("www"))
+ return true;
+ return false;
+ }
+
+ /**
+ * Given an RSS object and an existing activity,
+ * add the Rome extension to that activity and return it
+ *
+ * @param activity
+ * @param entry
+ * @return
+ */
+ private Activity addRomeExtension(Activity activity, ObjectNode entry) {
+ ObjectMapper mapper = new StreamsJacksonMapper();
+ ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
+ ObjectNode extensions = JsonNodeFactory.instance.objectNode();
+
+ extensions.put("rome", entry);
+ activityRoot.put("extensions", extensions);
+
+ activity = mapper.convertValue(activityRoot, Activity.class);
+
+ return activity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
deleted file mode 100644
index 06839f3..0000000
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.*;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
-
- private boolean includeRomeExtension;
-
- public SyndEntryActivitySerializer() {
- this(true);
- }
-
- public SyndEntryActivitySerializer(boolean includeRomeExtension) {
- this.includeRomeExtension = includeRomeExtension;
- }
-
-
- @Override
- public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
- List<Activity> result = Lists.newLinkedList();
- for (ObjectNode node : objectNodes) {
- result.add(deserialize(node));
- }
- return result;
- }
-
- @Override
- public String serializationFormat() {
- return "application/streams-provider-rss";
- }
-
- @Override
- public ObjectNode serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Rome");
- }
-
- @Override
- public Activity deserialize(ObjectNode syndEntry) {
- return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
- }
-
- public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
- Preconditions.checkNotNull(entry);
-
- Activity activity = new Activity();
- Provider provider = buildProvider(entry);
- Actor actor = buildActor(entry);
- ActivityObject activityObject = buildActivityObject(entry);
-
- activityObject.setUrl(provider.getUrl());
- activityObject.setAuthor(actor.getAuthor());
-
- activity.setUrl(provider.getUrl());
- activity.setProvider(provider);
- activity.setActor(actor);
- activity.setVerb("post");
- activity.setId("id:rss:post:" + activity.getUrl());
-
- JsonNode published = entry.get("publishedDate");
- if (published != null) {
- try {
- activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
- } catch (Exception e) {
- LOGGER.warn("Failed to parse date : {}", published.textValue());
-
- DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
- activity.setPublished(now);
- }
- }
-
- activity.setUpdated(activityObject.getUpdated());
- activity.setObject(activityObject);
-
- if (withExtension) {
- activity = addRomeExtension(activity, entry);
- }
-
- return activity;
- }
-
- /**
- * Given an RSS entry, extra out the author and actor information and return it
- * in an actor object
- *
- * @param entry
- * @return
- */
- private Actor buildActor(ObjectNode entry) {
- Author author = new Author();
- Actor actor = new Actor();
-
- if (entry.get("author") != null) {
- author.setId(entry.get("author").textValue());
- author.setDisplayName(entry.get("author").textValue());
-
- actor.setAuthor(author);
- String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
-
- actor.setId("id:rss:" + uriToSet + ":" + author.getId());
- actor.setDisplayName(author.getDisplayName());
- }
-
- return actor;
- }
-
- /**
- * Given an RSS object, build the ActivityObject
- *
- * @param entry
- * @return
- */
- private ActivityObject buildActivityObject(ObjectNode entry) {
- ActivityObject activityObject = new ActivityObject();
-
- JsonNode summary = entry.get("description");
- if (summary != null)
- activityObject.setSummary(summary.textValue());
- else if((summary = entry.get("title")) != null) {
- activityObject.setSummary(summary.textValue());
- }
-
- return activityObject;
- }
-
- /**
- * Given an RSS object, build and return the Provider object
- *
- * @param entry
- * @return
- */
- private Provider buildProvider(ObjectNode entry) {
- Provider provider = new Provider();
-
- String link = null;
- String uri = null;
- String resourceLocation = null;
-
- if (entry.get("link") != null)
- link = entry.get("link").textValue();
- if (entry.get("uri") != null)
- uri = entry.get("uri").textValue();
-
- /**
- * Order of precedence for resourceLocation selection
- *
- * 1. Valid URI
- * 2. Valid Link
- * 3. Non-null URI
- * 4. Non-null Link
- */
- if(isValidResource(uri))
- resourceLocation = uri;
- else if(isValidResource(link))
- resourceLocation = link;
- else if(uri != null || link != null) {
- resourceLocation = (uri != null) ? uri : link;
- }
-
- provider.setId("id:providers:rss");
- provider.setUrl(resourceLocation);
- provider.setDisplayName("RSS");
-
- return provider;
- }
-
- /**
- * Tests whether or not the passed in resource is a valid URI
- * @param resource
- * @return boolean of whether or not the resource is valid
- */
- private boolean isValidResource(String resource) {
- if(resource != null && resource.startsWith("http") || resource.startsWith("www"))
- return true;
- return false;
- }
-
- /**
- * Given an RSS object and an existing activity,
- * add the Rome extension to that activity and return it
- *
- * @param activity
- * @param entry
- * @return
- */
- private Activity addRomeExtension(Activity activity, ObjectNode entry) {
- ObjectMapper mapper = new StreamsJacksonMapper();
- ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
- ObjectNode extensions = JsonNodeFactory.instance.objectNode();
-
- extensions.put("rome", entry);
- activityRoot.put("extensions", extensions);
-
- activity = mapper.convertValue(activityRoot, Activity.class);
-
- return activity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
index fd9a996..446f998 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
@@ -26,7 +26,7 @@ import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Actor;
import org.apache.streams.pojo.json.Author;
import org.apache.streams.pojo.json.Provider;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class SyndEntryActivitySerizlizerTest {
List<Activity> activities = Lists.newLinkedList();
List<ObjectNode> objects = Lists.newLinkedList();
- SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
+ SyndEntryActivityConverter serializer = new SyndEntryActivityConverter();
while(scanner.hasNext()) {
String line = scanner.nextLine();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 3880135..1c0e8cb 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -50,9 +50,8 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-processor-jackson</artifactId>
+ <artifactId>streams-converters</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 4ca73df..5ad811d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -31,7 +31,7 @@ import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterConfigurator;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterDocumentClassifier;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,7 +111,7 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
}
protected void replace(Activity doc, String json) throws java.io.IOException, ActivitySerializerException {
- Class documentSubType = TwitterEventClassifier.detectClass(json);
+ Class documentSubType = TwitterDocumentClassifier.getInstance().detectClass(json);
Object object = mapper.readValue(json, documentSubType);
if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
index 674eef1..bffdef0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
@@ -26,7 +26,7 @@ import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterDocumentClassifier;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +90,7 @@ public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
}
- Class inClass = TwitterEventClassifier.detectClass(item);
+ Class inClass = TwitterDocumentClassifier.getInstance().detectClass(item);
User user;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
deleted file mode 100644
index 2234739..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.*;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Created by sblackmon on 12/13/13.
- */
-public class TwitterEventClassifier implements Serializable {
-
- private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
- public static Class detectClass( String json ) {
- Preconditions.checkNotNull(json);
- Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
- ObjectNode objectNode;
- try {
- objectNode = (ObjectNode) mapper.readTree(json);
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
-
- if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
- return Retweet.class;
- else if( objectNode.findValue("delete") != null )
- return Delete.class;
- else if( objectNode.findValue("friends") != null ||
- objectNode.findValue("friends_str") != null )
- return FriendList.class;
- else if( objectNode.findValue("target_object") != null )
- return UserstreamEvent.class;
- else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
- return User.class;
- else
- return Tweet.class;
- }
- public static ActivitySerializer bestSerializer( String json ) {
-
- Preconditions.checkNotNull(json);
- Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
- ObjectNode objectNode;
- try {
- objectNode = (ObjectNode) mapper.readTree(json);
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
-
- if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
- return TwitterJsonRetweetActivitySerializer.getInstance();
- else if( objectNode.findValue("delete") != null )
- return TwitterJsonDeleteActivitySerializer.getInstance();
-// else if( objectNode.findValue("friends") != null ||
-// objectNode.findValue("friends_str") != null )
-// return FriendList.class;
- else if( objectNode.findValue("target_object") != null )
- return TwitterJsonUserstreameventActivitySerializer.getInstance();
- else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
- return TwitterJsonUserActivitySerializer.getInstance();
- else
- return TwitterJsonTweetActivitySerializer.getInstance();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index bd67765..45bd071 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -31,7 +32,6 @@ import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
new file mode 100644
index 0000000..f14c9f5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityConverterResolver;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class TwitterConverterResolver implements ActivityConverterResolver {
+
+ public TwitterConverterResolver() {
+
+ }
+
+ private static TwitterConverterResolver instance = new TwitterConverterResolver();
+
+ public static TwitterConverterResolver getInstance() {
+ return instance;
+ }
+
+ private static ObjectMapper mapper = new StreamsJacksonMapper(StreamsTwitterMapper.TWITTER_FORMAT);
+
+ @Override
+ public Class bestSerializer(Class documentClass) throws ActivitySerializerException {
+
+ if (documentClass == Retweet.class)
+ return TwitterJsonRetweetActivityConverter.class;
+ else if (documentClass == Delete.class)
+ return TwitterJsonDeleteActivityConverter.class;
+ else if (documentClass == User.class)
+ return TwitterJsonUserActivityConverter.class;
+ else if (documentClass == UserstreamEvent.class)
+ return TwitterJsonUserstreameventActivityConverter.class;
+ else return TwitterJsonTweetActivityConverter.class;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
new file mode 100644
index 0000000..b9ca789
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.*;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class TwitterDocumentClassifier implements DocumentClassifier {
+
+ public TwitterDocumentClassifier() {
+
+ }
+
+ private static TwitterDocumentClassifier instance = new TwitterDocumentClassifier();
+
+ public static TwitterDocumentClassifier getInstance() {
+ return instance;
+ }
+
+ private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+ public Class detectClass(Object document) {
+
+ Preconditions.checkNotNull(document);
+ Preconditions.checkArgument(document instanceof String);
+
+ String json = (String)document;
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+ ObjectNode objectNode;
+ try {
+ objectNode = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
+ return Retweet.class;
+ else if( objectNode.findValue("delete") != null )
+ return Delete.class;
+ else if( objectNode.findValue("friends") != null ||
+ objectNode.findValue("friends_str") != null )
+ return FriendList.class;
+ else if( objectNode.findValue("target_object") != null )
+ return UserstreamEvent.class;
+ else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
+ return User.class;
+ else
+ return Tweet.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
new file mode 100644
index 0000000..3e123b6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityConverterFactory;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+import java.io.Serializable;
+
+/*
+ * Now that we have ActivityConverterProcessor, this shouldn't be neededß
+ */
+@Deprecated
+public class TwitterJsonActivityConverter implements ActivityConverter<String>, Serializable
+{
+
+ public TwitterJsonActivityConverter() {
+
+ }
+
+ private static TwitterJsonActivityConverter instance = new TwitterJsonActivityConverter();
+
+ public static TwitterJsonActivityConverter getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+ Class converterClass = TwitterDocumentClassifier.getInstance().detectClass(serialized);
+
+ ActivityConverter converter = ActivityConverterFactory.getInstance(converterClass);
+
+ Object typedObject = TypeConverterUtil.convert(serialized, converterClass);
+
+ Activity activity = converter.deserialize(typedObject);
+
+ if( activity == null )
+ throw new ActivitySerializerException("unrecognized type");
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ throw new NotImplementedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
deleted file mode 100644
index d1f0de9..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.*;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-
-import java.util.List;
-import java.io.Serializable;
-
-public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable
-{
-
- public TwitterJsonActivitySerializer() {
-
- }
-
- private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer();
-
- public static TwitterJsonActivitySerializer getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public String serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException();
- }
-
- @Override
- public Activity deserialize(String serialized) throws ActivitySerializerException {
-
- ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized);
- Activity activity = serializer.deserialize(serialized);
-
- if( activity == null )
- throw new ActivitySerializerException("unrecognized type");
-
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- throw new NotImplementedException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
new file mode 100644
index 0000000..8d8da28
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class TwitterJsonDeleteActivityConverter implements ActivityConverter<Delete>, Serializable {
+
+ private static TwitterJsonDeleteActivityConverter instance = new TwitterJsonDeleteActivityConverter();
+
+ public static TwitterJsonDeleteActivityConverter getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public Delete serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(Delete serialized) throws ActivitySerializerException {
+ return null;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<Delete> serializedList) {
+ return null;
+ }
+
+ public Activity convert(Delete delete) throws ActivitySerializerException {
+
+ Activity activity = new Activity();
+ updateActivity(delete, activity);
+ return activity;
+ }
+
+ public ActivityObject buildTarget(Tweet tweet) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
deleted file mode 100644
index b368f71..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Tweet;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
-
- private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer();
-
- public static TwitterJsonDeleteActivitySerializer getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public String serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException();
- }
-
- @Override
- public Activity deserialize(String serialized) throws ActivitySerializerException {
- return null;
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- return null;
- }
-
- public Activity convert(ObjectNode event) throws ActivitySerializerException {
-
- ObjectMapper mapper = StreamsTwitterMapper.getInstance();
- Delete delete = null;
- try {
- delete = mapper.treeToValue(event, Delete.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
- updateActivity(delete, activity);
- return activity;
- }
-
- public ActivityObject buildTarget(Tweet tweet) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
new file mode 100644
index 0000000..4b64932
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable {
+
+ public TwitterJsonRetweetActivityConverter() {
+
+ }
+
+ private static TwitterJsonRetweetActivityConverter instance = new TwitterJsonRetweetActivityConverter();
+
+ public static TwitterJsonRetweetActivityConverter getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public Retweet serialize(Activity deserialized) throws ActivitySerializerException {
+ return null;
+ }
+
+ @Override
+ public Activity deserialize(Retweet retweet) throws ActivitySerializerException {
+
+ Activity activity = new Activity();
+ updateActivity(retweet, activity);
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<Retweet> serializedList) {
+ List<Activity> result = Lists.newArrayList();
+ for( Retweet item : serializedList ) {
+ try {
+ Activity activity = deserialize(item);
+ result.add(activity);
+ } catch (ActivitySerializerException e) {}
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
deleted file mode 100644
index 58cb769..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable {
-
- public TwitterJsonRetweetActivitySerializer() {
-
- }
-
- private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer();
-
- public static TwitterJsonRetweetActivitySerializer getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public String serialize(Activity deserialized) throws ActivitySerializerException {
- return null;
- }
-
- @Override
- public Activity deserialize(String event) throws ActivitySerializerException {
-
- ObjectMapper mapper = StreamsTwitterMapper.getInstance();
- Retweet retweet = null;
- try {
- retweet = mapper.readValue(event, Retweet.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
- updateActivity(retweet, activity);
-
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
new file mode 100644
index 0000000..5cd1075
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable {
+
+ private static TwitterJsonTweetActivityConverter instance = new TwitterJsonTweetActivityConverter();
+
+ public static TwitterJsonTweetActivityConverter getInstance() {
+ return instance;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public Tweet serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(Tweet tweet) throws ActivitySerializerException {
+
+ Activity activity = new Activity();
+
+ updateActivity(tweet, activity);
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<Tweet> serializedList) {
+ List<Activity> result = Lists.newArrayList();
+ for( Tweet item : serializedList ) {
+ try {
+ Activity activity = deserialize(item);
+ result.add(activity);
+ } catch (ActivitySerializerException e) {}
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
deleted file mode 100644
index e6fc05f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Tweet;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
-
- private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer();
-
- public static TwitterJsonTweetActivitySerializer getInstance() {
- return instance;
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public String serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException();
- }
-
- @Override
- public Activity deserialize(String serialized) throws ActivitySerializerException {
-
- ObjectMapper mapper = StreamsTwitterMapper.getInstance();
- Tweet tweet = null;
- try {
- tweet = mapper.readValue(serialized, Tweet.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
-
- updateActivity(tweet, activity);
-
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- return null;
- }
-}