You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/20 23:00:12 UTC

[1/2] incubator-streams git commit: STREAMS-229 | Updated the GPlusTypeConverter to handle serialized versions of Google's Activity and Person objects, in addition to those objects themselves

Repository: incubator-streams
Updated Branches:
  refs/heads/master 1b5574142 -> cb9c6f548


STREAMS-229 | Updated the GPlusTypeConverter to handle serialized versions of Google's Activity and Person objects, in addition to those objects themselves


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3fba6933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3fba6933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3fba6933

Branch: refs/heads/master
Commit: 3fba69330d784715dba1116bd0833dbc1f155e65
Parents: d0b5a0a
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 19 13:59:01 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 19 13:59:01 2014 -0600

----------------------------------------------------------------------
 .../processor/GooglePlusTypeConverter.java      | 38 +++++++++++
 .../provider/GPlusUserActivityCollector.java    | 29 +++++++-
 .../serializer/util/GPlusEventClassifier.java   | 57 ++++++++++++++++
 .../util/GPlusEventClassifierTest.java          | 69 ++++++++++++++++++++
 4 files changed, 192 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
index b4cf21d..73e261f 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
@@ -18,12 +18,20 @@
 
 package com.google.gplus.processor;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.plus.model.Person;
 import com.google.common.collect.Lists;
+import com.google.gplus.serializer.util.GPlusActivityDeserializer;
+import com.google.gplus.serializer.util.GPlusEventClassifier;
+import com.google.gplus.serializer.util.GPlusPersonDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +42,7 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
     public final static String STREAMS_ID = "GooglePlusTypeConverter";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class);
+    private StreamsJacksonMapper mapper;
     private Queue<Person> inQueue;
     private Queue<StreamsDatum> outQueue;
     private GooglePlusActivityUtil googlePlusActivityUtil;
@@ -59,6 +68,10 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
             LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
             Activity activity = null;
 
+            if(item instanceof String) {
+                item = deserializeItem(item);
+            }
+
             if(item instanceof Person) {
                 activity = new Activity();
                 googlePlusActivityUtil.updateActivity((Person)item, activity);
@@ -82,9 +95,34 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
             return Lists.newArrayList();
     }
 
+    private Object deserializeItem(Object item) {
+        try {
+            Class klass = GPlusEventClassifier.detectClass((String) item);
+
+            if (klass.equals(Person.class)) {
+                item = mapper.readValue((String) item, Person.class);
+            } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) {
+                item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying to deserializeItem: {}", e);
+        }
+
+        return item;
+    }
+
     @Override
     public void prepare(Object configurationObject) {
         googlePlusActivityUtil = new GooglePlusActivityUtil();
+        mapper = new StreamsJacksonMapper();
+
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+        mapper.registerModule(simpleModule);
+
+        simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer());
+        mapper.registerModule(simpleModule);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
index 04f0aef..f1824b5 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -1,8 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package com.google.gplus.provider;
 
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Activity;
@@ -11,12 +32,12 @@ import com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -41,6 +62,12 @@ public class GPlusUserActivityCollector extends GPlusDataCollector {
     static { //set up mapper for Google Activity Object
         SimpleModule simpleModule = new SimpleModule();
         simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
+        simpleModule.addSerializer(com.google.api.client.util.DateTime.class, new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) {
+            @Override
+            public void serialize(com.google.api.client.util.DateTime dateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonGenerationException {
+                jsonGenerator.writeString(dateTime.toStringRfc3339());
+            }
+        });
         MAPPER.registerModule(simpleModule);
         MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
new file mode 100644
index 0000000..2866b5b
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gplus.serializer.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.Person;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class GPlusEventClassifier implements Serializable {
+    private static ObjectMapper mapper = new StreamsJacksonMapper();
+    private static final String ACTIVITY_IDENTIFIER = "\"plus#activity\"";
+    private static final String PERSON_IDENTIFIER = "\"plus#person\"";
+
+    public static Class detectClass(String json) {
+        Preconditions.checkNotNull(json);
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+        ObjectNode objectNode;
+        try {
+            objectNode = (ObjectNode) mapper.readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(ACTIVITY_IDENTIFIER)) {
+            return Activity.class;
+        } else if(objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(PERSON_IDENTIFIER)) {
+            return Person.class;
+        } else  {
+            return ObjectNode.class;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
new file mode 100644
index 0000000..308afe0
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.google.gplus.serializer.util;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.Person;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GPlusEventClassifierTest {
+    private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void classifyActivityTest() {
+        try {
+            Activity activity = new Activity();
+            activity.setKind("plus#activity");
+            Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity));
+
+            assertEquals(retClass, Activity.class);
+        } catch(Exception e) {
+            //
+        }
+    }
+
+    @Test
+    public void classifyPersonTest() {
+        try {
+            Person person = new Person();
+            person.setKind("plus#person");
+            Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+
+            assertEquals(retClass, Person.class);
+        } catch(Exception e) {
+            //
+        }
+    }
+
+    @Test
+    public void classifObjectNodeTest() {
+        try {
+            Person person = new Person();
+            person.setKind("fake");
+            Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+
+            assertEquals(retClass, ObjectNode.class);
+        } catch(Exception e) {
+            //
+        }
+    }
+}


[2/2] incubator-streams git commit: Merge branch 'STREAMS-229'

Posted by sb...@apache.org.
Merge branch 'STREAMS-229'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cb9c6f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cb9c6f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cb9c6f54

Branch: refs/heads/master
Commit: cb9c6f548c7e4d41e90e09fd31ab07deb811bf16
Parents: 1b55741 3fba693
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:59:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:59:18 2014 -0600

----------------------------------------------------------------------
 .../processor/GooglePlusTypeConverter.java      | 38 +++++++++++
 .../provider/GPlusUserActivityCollector.java    | 29 +++++++-
 .../serializer/util/GPlusEventClassifier.java   | 57 ++++++++++++++++
 .../util/GPlusEventClassifierTest.java          | 69 ++++++++++++++++++++
 4 files changed, 192 insertions(+), 1 deletion(-)
----------------------------------------------------------------------