You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/30 16:58:07 UTC
[2/2] git commit: Added new converter
Added new converter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6ffe29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6ffe29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6ffe29e
Branch: refs/heads/master
Commit: e6ffe29e8017592f6df881c8f8efc546041536be
Parents: 40ab159
Author: mfranklin <mf...@apache.org>
Authored: Wed Apr 30 10:56:02 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Apr 30 10:56:02 2014 -0400
----------------------------------------------------------------------
.../streams/mongo/MongoPersistWriter.java | 3 +-
.../SysomosBeatActivityConverter.java | 137 +++++++++++++++++++
.../sysomos/proessor/SysomosTypeConverter.java | 56 ++++++++
.../apache/streams/data/ActivitySerializer.java | 1 +
4 files changed, 196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index a4a089a..80541e8 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -14,6 +14,7 @@ import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
new file mode 100644
index 0000000..e084946
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.conversion;
+
+import com.google.common.collect.Maps;
+import com.sysomos.xml.BeatApi;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Converts an instance of a {@link com.sysomos.xml.BeatApi.BeatResponse.Beat} to an {@link org.apache.streams.pojo.json.Activity}
+ */
+public class SysomosBeatActivityConverter {
+
+ public static final String LANGUAGE_KEY = "LANGUAGE";
+
+ public Activity convert(BeatApi.BeatResponse.Beat beat) {
+ Activity converted = new Activity();
+ converted.setId(beat.getDocid());
+ converted.setVerb("posted");
+ converted.setContent(beat.getContent());
+ converted.setTitle(beat.getTitle());
+ converted.setPublished(new DateTime(beat.getTime()));
+ converted.setUrl(beat.getLink());
+ converted.setActor(new Actor());
+ Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
+ Map<String, Object> extensions = ensureExtensions(converted);
+ setLocation(beat, extensions);
+ setObject(beat, converted);
+ setProvider(beat, converted);
+ setLanguage(mappedTags, extensions);
+ extensions.put("sysomos", beat);
+
+ setChannelSpecificValues(beat, converted, mappedTags);
+
+ return converted;
+ }
+
+ protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
+ String mediaType = beat.getMediaType();
+ String lowerMediaType = mediaType.toLowerCase();
+ Actor actor = converted.getActor();
+ ActivityObject object = converted.getObject();
+ if ("TWITTER".equals(mediaType)) {
+ actor.setId(getPersonId(lowerMediaType, beat.getHost()));
+ actor.setDisplayName(beat.getHost());
+ actor.setUrl("http://twitter.com/" + beat.getHost());
+ object.setObjectType("tweet");
+ object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
+ } else if ("FACEBOOK".equals(mediaType)) {
+ actor.setId(getPersonId(lowerMediaType, mappedTags.get("FBID").getValue()));
+ actor.setDisplayName(beat.getTitle());
+ actor.setUrl(beat.getHost());
+ object.setObjectType("post");
+ object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+ } else {
+ actor.setId(null);
+ actor.setDisplayName(null);
+ actor.setUrl(null);
+ object.setObjectType("post");
+ object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+ }
+ }
+
+ protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
+ if(mappedTags.containsKey(LANGUAGE_KEY)) {
+ extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
+ }
+ }
+
+ protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
+ ActivityObject object = new ActivityObject();
+ converted.setObject(object);
+ object.setUrl(beat.getLink());
+ object.setContent(beat.getContent());
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
+ Map<String, Object> location;
+ String country = beat.getLocation().getCountry();
+ if(StringUtils.isNotBlank(country)) {
+ if (extensions.containsKey(LOCATION_EXTENSION)) {
+ location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
+ } else {
+ location = Maps.newHashMap();
+ extensions.put(LOCATION_EXTENSION, location);
+ }
+ location.put(LOCATION_EXTENSION_COUNTRY, country);
+ }
+ }
+
+ protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
+ Provider provider = new Provider();
+ String mediaType = beat.getMediaType().toLowerCase();
+ provider.setId(getProviderId(mediaType));
+ provider.setDisplayName(StringUtils.capitalize(mediaType));
+ converted.setProvider(provider);
+ }
+
+ protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
+ Map<String, BeatApi.BeatResponse.Beat.Tag> tags = Maps.newHashMap();
+ for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
+ if(tag.getSystemType() != null) {
+ tags.put(tag.getSystemType(), tag);
+ }
+ }
+ return tags;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
new file mode 100644
index 0000000..187d402
--- /dev/null
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/org/apache/streams/sysomos/proessor/SysomosTypeConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.sysomos.org.apache.streams.sysomos.proessor;
+
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
+
+import java.util.List;
+
+/**
+ * Stream processor that converts Sysomos type to Activity
+ */
+public class SysomosTypeConverter implements StreamsProcessor {
+
+ private SysomosBeatActivityConverter converter;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+ entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+ return Lists.newArrayList(entry);
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ converter = new SysomosBeatActivityConverter();
+ }
+
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e6ffe29e/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
index ad3809f..23903e5 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/ActivitySerializer.java
@@ -23,6 +23,7 @@ import org.apache.streams.pojo.json.Activity;
import java.util.List;
+//TODO: Change the name of this class to ActivityConverter STREAMS-68
/**
* Serializes and deserializes Activities
*/