You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/20 22:59:31 UTC
incubator-streams git commit: STREAMS-229 | Updated the
GPlusTypeConverter to handle serialized versions of Google's Activity and
Person objects, in addition to those objects themselves
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-229 [created] 3fba69330
STREAMS-229 | Updated the GPlusTypeConverter to handle serialized versions of Google's Activity and Person objects, in addition to those objects themselves
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3fba6933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3fba6933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3fba6933
Branch: refs/heads/STREAMS-229
Commit: 3fba69330d784715dba1116bd0833dbc1f155e65
Parents: d0b5a0a
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Wed Nov 19 13:59:01 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Wed Nov 19 13:59:01 2014 -0600
----------------------------------------------------------------------
.../processor/GooglePlusTypeConverter.java | 38 +++++++++++
.../provider/GPlusUserActivityCollector.java | 29 +++++++-
.../serializer/util/GPlusEventClassifier.java | 57 ++++++++++++++++
.../util/GPlusEventClassifierTest.java | 69 ++++++++++++++++++++
4 files changed, 192 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
index b4cf21d..73e261f 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
@@ -18,12 +18,20 @@
package com.google.gplus.processor;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.services.plus.model.Person;
import com.google.common.collect.Lists;
+import com.google.gplus.serializer.util.GPlusActivityDeserializer;
+import com.google.gplus.serializer.util.GPlusEventClassifier;
+import com.google.gplus.serializer.util.GPlusPersonDeserializer;
import com.google.gplus.serializer.util.GooglePlusActivityUtil;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +42,7 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
public final static String STREAMS_ID = "GooglePlusTypeConverter";
private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class);
+ private StreamsJacksonMapper mapper;
private Queue<Person> inQueue;
private Queue<StreamsDatum> outQueue;
private GooglePlusActivityUtil googlePlusActivityUtil;
@@ -59,6 +68,10 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
Activity activity = null;
+ if(item instanceof String) {
+ item = deserializeItem(item);
+ }
+
if(item instanceof Person) {
activity = new Activity();
googlePlusActivityUtil.updateActivity((Person)item, activity);
@@ -82,9 +95,34 @@ public class GooglePlusTypeConverter implements StreamsProcessor {
return Lists.newArrayList();
}
+ private Object deserializeItem(Object item) {
+ try {
+ Class klass = GPlusEventClassifier.detectClass((String) item);
+
+ if (klass.equals(Person.class)) {
+ item = mapper.readValue((String) item, Person.class);
+ } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) {
+ item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to deserializeItem: {}", e);
+ }
+
+ return item;
+ }
+
@Override
public void prepare(Object configurationObject) {
googlePlusActivityUtil = new GooglePlusActivityUtil();
+ mapper = new StreamsJacksonMapper();
+
+ SimpleModule simpleModule = new SimpleModule();
+ simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+ mapper.registerModule(simpleModule);
+
+ simpleModule = new SimpleModule();
+ simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer());
+ mapper.registerModule(simpleModule);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
index 04f0aef..f1824b5 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -1,8 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package com.google.gplus.provider;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.plus.Plus;
import com.google.api.services.plus.model.Activity;
@@ -11,12 +32,12 @@ import com.google.gplus.serializer.util.GPlusActivityDeserializer;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.google.gplus.configuration.UserInfo;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffException;
import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.concurrent.BlockingQueue;
/**
@@ -41,6 +62,12 @@ public class GPlusUserActivityCollector extends GPlusDataCollector {
static { //set up mapper for Google Activity Object
SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
+ simpleModule.addSerializer(com.google.api.client.util.DateTime.class, new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) {
+ @Override
+ public void serialize(com.google.api.client.util.DateTime dateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonGenerationException {
+ jsonGenerator.writeString(dateTime.toStringRfc3339());
+ }
+ });
MAPPER.registerModule(simpleModule);
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
new file mode 100644
index 0000000..2866b5b
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusEventClassifier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gplus.serializer.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.Person;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class GPlusEventClassifier implements Serializable {
+ private static ObjectMapper mapper = new StreamsJacksonMapper();
+ private static final String ACTIVITY_IDENTIFIER = "\"plus#activity\"";
+ private static final String PERSON_IDENTIFIER = "\"plus#person\"";
+
+ public static Class detectClass(String json) {
+ Preconditions.checkNotNull(json);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+ ObjectNode objectNode;
+ try {
+ objectNode = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(ACTIVITY_IDENTIFIER)) {
+ return Activity.class;
+ } else if(objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(PERSON_IDENTIFIER)) {
+ return Person.class;
+ } else {
+ return ObjectNode.class;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fba6933/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
new file mode 100644
index 0000000..308afe0
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.google.gplus.serializer.util;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.Person;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GPlusEventClassifierTest {
+ private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Test
+ public void classifyActivityTest() {
+ try {
+ Activity activity = new Activity();
+ activity.setKind("plus#activity");
+ Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity));
+
+ assertEquals(retClass, Activity.class);
+ } catch(Exception e) {
+ //
+ }
+ }
+
+ @Test
+ public void classifyPersonTest() {
+ try {
+ Person person = new Person();
+ person.setKind("plus#person");
+ Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+
+ assertEquals(retClass, Person.class);
+ } catch(Exception e) {
+ //
+ }
+ }
+
+ @Test
+ public void classifObjectNodeTest() {
+ try {
+ Person person = new Person();
+ person.setKind("fake");
+ Class retClass = GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
+
+ assertEquals(retClass, ObjectNode.class);
+ } catch(Exception e) {
+ //
+ }
+ }
+}