You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/30 16:58:07 UTC

[2/2] git commit: Added new converter

Added new converter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6ffe29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6ffe29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6ffe29e

Branch: refs/heads/master
Commit: e6ffe29e8017592f6df881c8f8efc546041536be
Parents: 40ab159
Author: mfranklin <mf...@apache.org>
Authored: Wed Apr 30 10:56:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Apr 30 10:56:02 2014 -0400

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistWriter.java       |   3 +-
 .../SysomosBeatActivityConverter.java           | 137 +++++++++++++++++++
 .../sysomos/proessor/SysomosTypeConverter.java  |  56 ++++++++
 .../apache/streams/data/ActivitySerializer.java |   1 +
 4 files changed, 196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index a4a089a..80541e8 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -14,6 +14,7 @@ import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
     private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
     private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
new file mode 100644
index 0000000..e084946
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.conversion;
+
+import com.google.common.collect.Maps;
+import com.sysomos.xml.BeatApi;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Converts an instance of a {@link com.sysomos.xml.BeatApi.BeatResponse.Beat} to an {@link org.apache.streams.pojo.json.Activity}
+ */
+public class SysomosBeatActivityConverter {
+
+    public static final String LANGUAGE_KEY = "LANGUAGE";
+
+    public Activity convert(BeatApi.BeatResponse.Beat beat) {
+        Activity converted = new Activity();
+        converted.setId(beat.getDocid());
+        converted.setVerb("posted");
+        converted.setContent(beat.getContent());
+        converted.setTitle(beat.getTitle());
+        converted.setPublished(new DateTime(beat.getTime()));
+        converted.setUrl(beat.getLink());
+        converted.setActor(new Actor());
+        Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
+        Map<String, Object> extensions = ensureExtensions(converted);
+        setLocation(beat, extensions);
+        setObject(beat, converted);
+        setProvider(beat, converted);
+        setLanguage(mappedTags, extensions);
+        extensions.put("sysomos", beat);
+
+        setChannelSpecificValues(beat, converted, mappedTags);
+
+        return converted;
+    }
+
+    protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
+        String mediaType = beat.getMediaType();
+        String lowerMediaType = mediaType.toLowerCase();
+        Actor actor = converted.getActor();
+        ActivityObject object = converted.getObject();
+        if ("TWITTER".equals(mediaType)) {
+            actor.setId(getPersonId(lowerMediaType, beat.getHost()));
+            actor.setDisplayName(beat.getHost());
+            actor.setUrl("http://twitter.com/" + beat.getHost());
+            object.setObjectType("tweet");
+            object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
+        } else if ("FACEBOOK".equals(mediaType)) {
+            actor.setId(getPersonId(lowerMediaType, mappedTags.get("FBID").getValue()));
+            actor.setDisplayName(beat.getTitle());
+            actor.setUrl(beat.getHost());
+            object.setObjectType("post");
+            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+        } else {
+            actor.setId(null);
+            actor.setDisplayName(null);
+            actor.setUrl(null);
+            object.setObjectType("post");
+            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+        }
+    }
+
+    protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
+        if(mappedTags.containsKey(LANGUAGE_KEY)) {
+            extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
+        }
+    }
+
+    protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
+        ActivityObject object = new ActivityObject();
+        converted.setObject(object);
+        object.setUrl(beat.getLink());
+        object.setContent(beat.getContent());
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
+        Map<String, Object> location;
+        String country = beat.getLocation().getCountry();
+        if(StringUtils.isNotBlank(country)) {
+            if (extensions.containsKey(LOCATION_EXTENSION)) {
+                location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
+            } else {
+                location = Maps.newHashMap();
+                extensions.put(LOCATION_EXTENSION, location);
+            }
+            location.put(LOCATION_EXTENSION_COUNTRY, country);
+        }
+    }
+
+    protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
+        Provider provider = new Provider();
+        String mediaType = beat.getMediaType().toLowerCase();
+        provider.setId(getProviderId(mediaType));
+        provider.setDisplayName(StringUtils.capitalize(mediaType));
+        converted.setProvider(provider);
+    }
+
+    protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
+        Map<String, BeatApi.BeatResponse.Beat.Tag> tags = Maps.newHashMap();
+        for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
+            if(tag.getSystemType() != null) {
+                tags.put(tag.getSystemType(), tag);
+            }
+        }
+        return tags;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
new file mode 100644
index 0000000..187d402
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+    private SysomosBeatActivityConverter converter;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+            entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+            return Lists.newArrayList(entry);
+        } else {
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        converter = new SysomosBeatActivityConverter();
+    }
+
+    @Override
+    public void cleanUp() {
+        //NOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
index ad3809f..23903e5 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
@@ -23,6 +23,7 @@ import org.apache.streams.pojo.json.Activity;
 
 import java.util.List;
 
+//TODO:  Change the name of this class to ActivityConverter  STREAMS-68
 /**
  * Serializes and deserializes Activities
  */