You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:20 UTC
[05/14] git commit: created a DatasiftObjectMapper capable of parsing
either RFC3339 or the standard Datasift date string changed all classes in
package except configurator to use this Mapper
created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string
changed all classes in package except configurator to use this Mapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7301326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7301326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7301326
Branch: refs/heads/master
Commit: b7301326a83936a528219c5b84b35a86a57d5971
Parents: bdf30f7
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:24:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:24:14 2014 -0500
----------------------------------------------------------------------
.../provider/DatasiftStreamProvider.java | 4 +-
.../DatasiftTypeConverterProcessor.java | 7 +-
.../serializer/DatasiftActivitySerializer.java | 4 +-
.../DatasiftDefaultActivitySerializer.java | 21 ++---
.../DatasiftTweetActivitySerializer.java | 14 ++--
.../datasift/util/StreamsDatasiftMapper.java | 84 ++++++++++++++++++++
.../DatasiftActivitySerializerTest.java | 31 +++++++-
7 files changed, 135 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 09c01b0..8ed1443 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -35,7 +35,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,7 +202,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
public void prepare(Object configurationObject) {
this.interactions = new ConcurrentLinkedQueue<Interaction>();
this.clients = Maps.newHashMap();
- this.mapper = StreamsJacksonMapper.getInstance();
+ this.mapper = StreamsDatasiftMapper.getInstance();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
index 203e5e8..0b847a4 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -21,10 +21,11 @@ package org.apache.streams.datasift.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
-import org.apache.streams.core.*;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.apache.streams.pojo.json.Activity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
@Override
public void prepare(Object configurationObject) {
- this.mapper = StreamsJacksonMapper.getInstance();
+ this.mapper = StreamsDatasiftMapper.getInstance();
this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
if(this.outClass.equals(Activity.class)) {
this.converter = new ActivityConverter();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 1d5fd2c..7d644f0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -21,8 +21,8 @@ package org.apache.streams.datasift.serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import java.util.List;
@@ -34,7 +34,7 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
@Override
public String serializationFormat() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index 4095df6..b70aa12 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -1,15 +1,13 @@
package org.apache.streams.datasift.serializer;
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.*;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.interaction.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.apache.streams.pojo.json.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +28,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
@Override
public String serializationFormat() {
@@ -46,7 +44,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
try {
return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
} catch (Exception e) {
- LOGGER.error("Excpetion while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+ LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
LOGGER.error("Exception : {}", e);
throw new RuntimeException(e);
}
@@ -55,13 +53,6 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
@Override
public Activity deserialize(Datasift serialized) {
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
- mapper.setAnnotationIntrospector(introspector);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
try {
Activity activity = convert(serialized);
@@ -141,7 +132,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
Actor actor = new Actor();
org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
if(author == null) {
- LOGGER.warn("Interactiond does not contain author information.");
+ LOGGER.warn("Interaction does not contain author information.");
return actor;
}
String userName = author.getUsername();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index e7b0a52..b16aae2 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -22,8 +22,8 @@ package org.apache.streams.datasift.serializer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Author;
import org.apache.streams.datasift.interaction.Interaction;
import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
import org.apache.streams.datasift.twitter.Retweet;
@@ -134,7 +134,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
.orNull()));
actor.setSummary(user.getDescription());
try {
- actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
+ actor.setPublished(user.getCreatedAt());
} catch (Exception e) {
LOGGER.warn("Exception trying to parse date : {}", e);
}
@@ -149,14 +149,16 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
extensions.put("followers", user.getFollowersCount());
extensions.put("screenName", user.getScreenName());
if(user.getAdditionalProperties() != null) {
- extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
+ extensions.put("favorites", user.getFavouritesCount());
}
Image profileImage = new Image();
String profileUrl = null;
- profileUrl = event.getInteraction().getAuthor().getAvatar();
- if(profileUrl == null && user.getAdditionalProperties() != null) {
- Object url = user.getAdditionalProperties().get("profile_image_url_https");
+ Author author = event.getInteraction().getAuthor();
+ if( author != null )
+ profileUrl = author.getAvatar();
+ if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
+ Object url = user.getProfileImageUrlHttps();
if(url instanceof String)
profileUrl = (String) url;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
new file mode 100644
index 0000000..c5f2abf
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsDatasiftMapper extends StreamsJacksonMapper {
+
+ public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+
+ public static final Long getMillis(String dateTime) {
+
+ // this function is for pig which doesn't handle exceptions well
+ try {
+ Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+ return result;
+ } catch( Exception e ) {
+ return null;
+ }
+
+ }
+
+ private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper();
+
+ public static StreamsDatasiftMapper getInstance(){
+ return INSTANCE;
+ }
+
+ public StreamsDatasiftMapper() {
+ super();
+ registerModule(new SimpleModule()
+ {
+ {
+ addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+ @Override
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+ DateTime result = null;
+ try {
+ result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+ } catch (Exception e) {}
+ if (result == null) {
+ try {
+ result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+ } catch (Exception e) {}
+ }
+ return result;
+ }
+ });
+ }
+ });
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 88dd2d6..90b7285 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,7 +2,8 @@ package org.apache.streams.datasift.serializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Actor;
import org.junit.Test;
@@ -11,11 +12,12 @@ import java.util.Scanner;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
public class DatasiftActivitySerializerTest {
private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
@Test
public void testGeneralConversion() throws Exception {
@@ -39,6 +41,8 @@ public class DatasiftActivitySerializerTest {
while(scanner.hasNextLine()) {
line = scanner.nextLine();
testGeneralConversion(line);
+ testDeserNoNull(line);
+ testDeserNoAddProps(line);
System.out.println("ORIGINAL -> "+line);
System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
System.out.println("NODE -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
@@ -59,8 +63,31 @@ public class DatasiftActivitySerializerTest {
assertNotNull(json, activity.getUrl());
Actor actor = activity.getActor();
assertNotNull(json, actor);
+
}
+ /**
+ * Test that null fields are not present
+ * @param json
+ */
+ private void testDeserNoNull(String json) throws Exception {
+ Activity ser = SERIALIZER.deserialize(json);
+ String deser = MAPPER.writeValueAsString(ser);
+ int nulls = StringUtils.countMatches(deser, ":null");
+ assertEquals(0l, (long)nulls);
+
+ }
+ /**
+ * Test that null fields are not present
+ * @param json
+ */
+ private void testDeserNoAddProps(String json) throws Exception {
+ Activity ser = SERIALIZER.deserialize(json);
+ String deser = MAPPER.writeValueAsString(ser);
+ int nulls = StringUtils.countMatches(deser, "additionalProperties:{");
+ assertEquals(0l, (long)nulls);
+
+ }
}