You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/19 20:35:58 UTC

[1/6] incubator-streams git commit: start level-up gplus providers

Repository: incubator-streams
Updated Branches:
  refs/heads/master 104f29b1e -> dd58c877b


start level-up gplus providers


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7ee7d4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7ee7d4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7ee7d4d

Branch: refs/heads/master
Commit: b7ee7d4d9845d15685ae6b9e5b8a332bbc18f891
Parents: 3234cdb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 14 09:33:44 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 14 09:33:44 2016 -0500

----------------------------------------------------------------------
 .../google-gplus/pom.xml                        |  9 ++-
 .../provider/GPlusUserActivityProvider.java     | 56 ++++++++++++++++++
 .../gplus/provider/GPlusUserDataProvider.java   | 60 ++++++++++++++++++++
 .../providers/GPlusUserActivityProviderIT.java  | 52 +++++++++++++++++
 .../test/providers/GPlusUserDataProviderIT.java | 52 +++++++++++++++++
 .../resources/GPlusUserActivityProviderIT.conf  | 22 +++++++
 .../test/resources/GPlusUserDataProviderIT.conf | 22 +++++++
 7 files changed, 270 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml
index e99fb5f..1856ff9 100644
--- a/streams-contrib/streams-provider-google/google-gplus/pom.xml
+++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml
@@ -30,8 +30,11 @@
     <description>Google+ Provider</description>
 
     <properties>
+        <google.client.version>1.22.0</google.client.version>
+        <gplus.client.version>v1-rev457-1.22.0</gplus.client.version>
         <skipITs>true</skipITs>
         <testDataBaseURl>http://streams.peoplepattern.com.s3.amazonaws.com/test-data/</testDataBaseURl>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
     <repositories>
@@ -94,17 +97,17 @@
         <dependency>
             <groupId>com.google.apis</groupId>
             <artifactId>google-api-services-plus</artifactId>
-            <version>v1-rev184-1.19.0</version>
+            <version>${gplus.client.version}</version>
         </dependency>
         <dependency>
             <groupId>com.google.api-client</groupId>
             <artifactId>google-api-client</artifactId>
-            <version>1.17.0-rc</version>
+            <version>${google.client.version}</version>
         </dependency>
         <dependency>
             <groupId>com.google.http-client</groupId>
             <artifactId>google-http-client-jackson2</artifactId>
-            <version>1.17.0-rc</version>
+            <version>${google.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
index 0ab75e6..c890cc1 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
@@ -19,12 +19,27 @@
 package com.google.gplus.provider;
 
 import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.GPlusConfiguration;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -50,4 +65,45 @@ public class GPlusUserActivityProvider extends AbstractGPlusProvider{
     protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
         return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
+        GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config);
+
+        Gson gson = new Gson();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                if (datum.getDocument() instanceof String)
+                    json = (String) datum.getDocument();
+                else
+                    json = gson.toJson(datum.getDocument());
+                json = gson.toJson(datum.getDocument());
+                outStream.println(json);
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
index 2effdea..e264318 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -18,13 +18,32 @@
 
 package com.google.gplus.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.GPlusConfiguration;
 import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -50,4 +69,45 @@ public class GPlusUserDataProvider extends AbstractGPlusProvider{
     protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
         return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
+        GPlusUserDataProvider provider = new GPlusUserDataProvider(config);
+
+        Gson gson = new Gson();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                if (datum.getDocument() instanceof String)
+                    json = (String) datum.getDocument();
+                else
+                    json = gson.toJson(datum.getDocument());
+                json = gson.toJson(datum.getDocument());
+                outStream.println(json);
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
new file mode 100644
index 0000000..9fc470e
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.test.providers;
+
+import com.google.common.collect.Lists;
+import com.google.gplus.provider.GPlusUserActivityProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class GPlusUserActivityProviderIT {
+
+    @Test
+    public void testGPlusUserActivityProvider() throws Exception {
+
+        String configfile = "./target/test-classes/GPlusUserActivityProviderIT.conf";
+        String outfile = "./target/test-classes/GPlusUserActivityProviderIT.stdout.txt";
+
+        GPlusUserActivityProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 1);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
new file mode 100644
index 0000000..574dcc1
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.test.providers;
+
+import com.google.common.collect.Lists;
+import com.google.gplus.provider.GPlusUserDataProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class GPlusUserDataProviderIT {
+
+    @Test
+    public void testGPlusUserDataProvider() throws Exception {
+
+        String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf";
+        String outfile = "./target/test-classes/GPlusUserDataProviderIT.stdout.txt";
+
+        GPlusUserDataProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() > 1);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
new file mode 100644
index 0000000..6c8ecc1
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+gplus {
+  gplusUsers = [
+    {
+      userId = "+apache"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
new file mode 100644
index 0000000..6c8ecc1
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+gplus {
+  gplusUsers = [
+    {
+      userId = "+apache"
+    }
+  ]
+}
\ No newline at end of file


[5/6] incubator-streams git commit: fix failing ITs

Posted by sb...@apache.org.
fix failing ITs


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ee1d3c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ee1d3c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ee1d3c09

Branch: refs/heads/master
Commit: ee1d3c0956285f05aa7f69393267595da071b02f
Parents: 770a8cb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 14 18:38:47 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 14 18:38:47 2016 -0500

----------------------------------------------------------------------
 .../util/GPlusActivityDeserializer.java         | 29 ++++++++++----------
 .../util/GPlusPersonDeserializer.java           | 12 ++++----
 .../serializer/util/GooglePlusActivityUtil.java |  4 ++-
 .../processor/GooglePlusActivitySerDeIT.java    |  4 +--
 4 files changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee1d3c09/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
index 988e138..7ff1d1e 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusActivityDeserializer.java
@@ -142,20 +142,21 @@ public class GPlusActivityDeserializer extends JsonDeserializer<Activity> {
      */
     private List<Activity.PlusObject.Attachments> buildAttachments(JsonNode objectNode) {
         List<Activity.PlusObject.Attachments> attachments = Lists.newArrayList();
-        for (JsonNode attachmentNode : objectNode.get("attachments")) {
-            Activity.PlusObject.Attachments attachments1 = new Activity.PlusObject.Attachments();
-            attachments1.setObjectType(attachmentNode.get("objectType").asText());
-            attachments1.setDisplayName(attachmentNode.get("displayName").asText());
-            attachments1.setContent(attachmentNode.get("content").asText());
-            attachments1.setUrl(attachmentNode.get("url").asText());
-
-            Activity.PlusObject.Attachments.Image image1 = new Activity.PlusObject.Attachments.Image();
-            JsonNode imageNode1 = attachmentNode.get("image");
-            image1.setUrl(imageNode1.get("url").asText());
-            attachments1.setImage(image1);
-
-            attachments.add(attachments1);
-        }
+        if( objectNode.has("attachments") )
+            for (JsonNode attachmentNode : objectNode.get("attachments")) {
+                Activity.PlusObject.Attachments attachments1 = new Activity.PlusObject.Attachments();
+                attachments1.setObjectType(attachmentNode.get("objectType").asText());
+                if( attachmentNode.has("displayName")) attachments1.setDisplayName(attachmentNode.get("displayName").asText());
+                if( attachmentNode.has("content")) attachments1.setContent(attachmentNode.get("content").asText());
+                if( attachmentNode.has("url")) attachments1.setUrl(attachmentNode.get("url").asText());
+
+                Activity.PlusObject.Attachments.Image image1 = new Activity.PlusObject.Attachments.Image();
+                JsonNode imageNode1 = attachmentNode.get("image");
+                image1.setUrl(imageNode1.get("url").asText());
+                attachments1.setImage(image1);
+
+                attachments.add(attachments1);
+            }
 
         return attachments;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee1d3c09/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
index 179f148..e562d4f 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GPlusPersonDeserializer.java
@@ -91,13 +91,15 @@ public class GPlusPersonDeserializer extends JsonDeserializer<Person> {
             person.setVerified(node.get("verified").asBoolean());
 
             List<Person.Emails> emails = Lists.newArrayList();
-            for (JsonNode emailNode : node.get("emails")) {
-                Person.Emails email = m.readValue(m.writeValueAsString(emailNode), Person.Emails.class);
-                emails.add(email);
+            if( node.has("emails")) {
+                for (JsonNode emailNode : node.get("emails")) {
+                    Person.Emails email = m.readValue(m.writeValueAsString(emailNode), Person.Emails.class);
+                    emails.add(email);
+                }
             }
 
-            person.setTagline(node.get("tagline").asText());
-            person.setAboutMe(node.get("aboutMe").asText());
+            if( node.has("tagline")) person.setTagline(node.get("tagline").asText());
+            if( node.has("aboutMe")) person.setAboutMe(node.get("aboutMe").asText());
         } catch (Exception e) {
             LOGGER.error("Exception while trying to deserialize a Person object: {}", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee1d3c09/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
index 7f2a134..fb16180 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/serializer/util/GooglePlusActivityUtil.java
@@ -149,10 +149,12 @@ public class GooglePlusActivityUtil {
      * @param gPlusActivity
      */
     private static void addGPlusExtensions(Activity activity, com.google.api.services.plus.model.Activity gPlusActivity) {
+
+        activity.getAdditionalProperties().put("googlePlus", gPlusActivity);
+
         Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
 
         com.google.api.services.plus.model.Activity.PlusObject object = gPlusActivity.getObject();
-        extensions.put("googlePlus", gPlusActivity);
 
         if(object != null) {
             com.google.api.services.plus.model.Activity.PlusObject.Plusoners plusoners = object.getPlusoners();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee1d3c09/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
index bc6c33a..7edce7d 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
@@ -25,6 +25,7 @@ import com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.pojo.json.Provider;
@@ -96,9 +97,8 @@ public class GooglePlusActivitySerDeIT {
                     assertNotNull(activity.getTitle());
                     assertNotNull(activity.getUrl());
 
-                    Map<String, Object> extensions = (Map<String, Object>)activity.getAdditionalProperties().get("extensions");
+                    Map<String, Object> extensions = ExtensionUtil.getInstance().getExtensions(activity);
                     assertNotNull(extensions);
-                    assertNotNull(extensions.get("googlePlus"));
 
                     if(activity.getContent() != null) {
                         assertNotNull(extensions.get("rebroadcasts"));


[3/6] incubator-streams git commit: isolate isRunning and readCurrent (STREAMS-425)

Posted by sb...@apache.org.
isolate isRunning and readCurrent (STREAMS-425)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a0fb1937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a0fb1937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a0fb1937

Branch: refs/heads/master
Commit: a0fb1937df887b789db7a57bfab1fc99f8db45b8
Parents: 3f80b0c
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 14 15:49:54 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 14 15:49:54 2016 -0500

----------------------------------------------------------------------
 .../gplus/provider/AbstractGPlusProvider.java   | 71 ++++++++++++--------
 1 file changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a0fb1937/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
index 44e1b03..b9e9b2d 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -29,6 +29,10 @@ import com.google.api.services.plus.Plus;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
@@ -48,6 +52,7 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -74,7 +79,11 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
     private static final Gson GSON = new Gson();
 
     private GPlusConfiguration config;
-    private ExecutorService executor;
+
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    private ListeningExecutorService executor;
+
     private BlockingQueue<StreamsDatum> datumQueue;
     private BlockingQueue<Runnable> runnables;
     private AtomicBoolean isComplete;
@@ -94,6 +103,28 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
     }
 
     @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
+        Preconditions.checkNotNull(config.getOauth().getAppName());
+        Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
+
+        try {
+            this.plus = createPlusClient();
+        } catch (IOException|GeneralSecurityException e) {
+            LOGGER.error("Failed to created oauth for GPlus : {}", e);
+            throw new RuntimeException(e);
+        }
+        // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one
+        // collector unless you have multiple oauth tokens
+        //TODO make this configurable based on the number of oauth tokens
+        this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+        this.datumQueue = new LinkedBlockingQueue<>(1000);
+        this.isComplete = new AtomicBoolean(false);
+        this.previousPullWasEmpty = false;
+    }
+
+    @Override
     public void startStream() {
 
         BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
@@ -143,33 +174,6 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
         return null;
     }
 
-    @Override
-    public boolean isRunning() {
-        return !this.isComplete.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
-        Preconditions.checkNotNull(config.getOauth().getAppName());
-        Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
-
-        try {
-            this.plus = createPlusClient();
-        } catch (IOException|GeneralSecurityException e) {
-            LOGGER.error("Failed to created oauth for GPlus : {}", e);
-            throw new RuntimeException(e);
-        }
-        // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one
-        // collector unless you have multiple oauth tokens
-        //TODO make this configurable based on the number of oauth tokens
-        this.executor = Executors.newFixedThreadPool(1);
-        this.datumQueue = new LinkedBlockingQueue<>(1000);
-        this.isComplete = new AtomicBoolean(false);
-        this.previousPullWasEmpty = false;
-    }
-
     @VisibleForTesting
     protected Plus createPlusClient() throws IOException, GeneralSecurityException {
         credential = new GoogleCredential.Builder()
@@ -243,4 +247,15 @@ public abstract class AbstractGPlusProvider implements StreamsProvider {
         this.config.setGooglePlusUsers(gPlusUsers);
     }
 
+    @Override
+    public boolean isRunning() {
+       if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            isComplete.set(true);
+           LOGGER.info("Exiting");
+       }
+       return !isComplete.get();
+    }
+
+
 }


[4/6] incubator-streams git commit: add main methods and instructions on how to run to each Provider (STREAMS-411)

Posted by sb...@apache.org.
add main methods and instructions on how to run to each Provider (STREAMS-411)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/770a8cb1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/770a8cb1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/770a8cb1

Branch: refs/heads/master
Commit: 770a8cb16db77372d1d50895045472211db4cd40
Parents: a0fb193
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 14 15:50:25 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 14 15:50:25 2016 -0500

----------------------------------------------------------------------
 .../provider/GPlusUserActivityProvider.java     | 13 ++++++++++
 .../gplus/provider/GPlusUserDataProvider.java   | 13 ++++++++++
 .../google-gplus/src/site/markdown/index.md     | 25 +++++++++++++++++++-
 3 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/770a8cb1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
index 172c649..ab77973 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
@@ -42,7 +42,20 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
+ *  Retrieve recent activity from a list of accounts.
  *
+ *  To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  gplus.oauth.pathToP12KeyFile
+ *  gplus.oauth.serviceAccountEmailAddress
+ *  gplus.apiKey
+ *  gplus.googlePlusUsers
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserActivityProvider -Dexec.args="application.conf activity.json"
  */
 public class GPlusUserActivityProvider extends AbstractGPlusProvider{
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/770a8cb1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
index d31d764..1541818 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -46,7 +46,20 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
+ *  Retrieve current profile status for a list of accounts.
  *
+ *  To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  gplus.oauth.pathToP12KeyFile
+ *  gplus.oauth.serviceAccountEmailAddress
+ *  gplus.apiKey
+ *  gplus.googlePlusUsers
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserDataProvider -Dexec.args="application.conf profiles.json"
  */
 public class GPlusUserDataProvider extends AbstractGPlusProvider{
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/770a8cb1/streams-contrib/streams-provider-google/google-gplus/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/site/markdown/index.md b/streams-contrib/streams-provider-google/google-gplus/src/site/markdown/index.md
index fb297e3..ec5bcee 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-google/google-gplus/src/site/markdown/index.md
@@ -1,4 +1,4 @@
-org.apache.streams:google-gmail
+org.apache.streams:google-gplus
 ===============================
 
 google-gplus contains providers, conversions, and utility classes for activity exchange with Google+
@@ -19,6 +19,29 @@ google-gplus contains providers, conversions, and utility classes for activity e
 | GPlusUserActivityProvider | [GPlusUserActivityProvider.html](apidocs/com/google/gplus/provider/GPlusUserActivityProvider.html "GPlusUserActivityProvider.html") |
 | GooglePlusTypeConverter | [GooglePlusTypeConverter.html](apidocs/com/google/gplus/processor/GooglePlusTypeConverter.html "GooglePlusTypeConverter.html") |
 
+Test:
+-----
+
+Log into admin console
+Create project
+Enable Data API on project
+Create service account
+Download p12 file
+
+Create a local file `gplus.conf` with valid gplus credentials
+
+    gplus {
+      apiKey = ""
+      oauth {
+        serviceAccountEmailAddress = ""
+        pathToP12KeyFile = ""
+      }
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/gplus.conf"
+    
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0


[6/6] incubator-streams git commit: Merge branch '0.4-gplus'

Posted by sb...@apache.org.
Merge branch '0.4-gplus'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dd58c877
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dd58c877
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dd58c877

Branch: refs/heads/master
Commit: dd58c877b89a00882e960e37e89259de6ebba4ce
Parents: 104f29b ee1d3c0
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 19 15:33:50 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 19 15:33:50 2016 -0500

----------------------------------------------------------------------
 .../google-gplus/pom.xml                        |  9 ++-
 .../gplus/provider/AbstractGPlusProvider.java   | 71 +++++++++++--------
 .../provider/GPlusUserActivityCollector.java    |  3 +-
 .../provider/GPlusUserActivityProvider.java     | 68 ++++++++++++++++++
 .../gplus/provider/GPlusUserDataCollector.java  |  3 +-
 .../gplus/provider/GPlusUserDataProvider.java   | 72 ++++++++++++++++++++
 .../util/GPlusActivityDeserializer.java         | 29 ++++----
 .../util/GPlusPersonDeserializer.java           | 12 ++--
 .../serializer/util/GooglePlusActivityUtil.java |  4 +-
 .../google-gplus/src/site/markdown/index.md     | 25 ++++++-
 .../processor/GooglePlusActivitySerDeIT.java    |  4 +-
 .../providers/GPlusUserActivityProviderIT.java  | 52 ++++++++++++++
 .../test/providers/GPlusUserDataProviderIT.java | 52 ++++++++++++++
 .../resources/GPlusUserActivityProviderIT.conf  | 22 ++++++
 .../test/resources/GPlusUserDataProviderIT.conf | 22 ++++++
 15 files changed, 392 insertions(+), 56 deletions(-)
----------------------------------------------------------------------



[2/6] incubator-streams git commit: integration tests work

Posted by sb...@apache.org.
integration tests work


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3f80b0c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3f80b0c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3f80b0c1

Branch: refs/heads/master
Commit: 3f80b0c1948046d295910bc3c1368e839e097b7d
Parents: b7ee7d4
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 14 15:32:24 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 14 15:32:24 2016 -0500

----------------------------------------------------------------------
 .../com/google/gplus/provider/GPlusUserActivityCollector.java     | 3 ++-
 .../java/com/google/gplus/provider/GPlusUserActivityProvider.java | 1 -
 .../java/com/google/gplus/provider/GPlusUserDataCollector.java    | 3 ++-
 .../java/com/google/gplus/provider/GPlusUserDataProvider.java     | 1 -
 .../streams/gplus/test/providers/GPlusUserDataProviderIT.java     | 2 +-
 .../src/test/resources/GPlusUserActivityProviderIT.conf           | 2 +-
 .../google-gplus/src/test/resources/GPlusUserDataProviderIT.conf  | 2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
index d76e225..bf8d20f 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -111,7 +111,8 @@ public class GPlusUserActivityCollector extends GPlusDataCollector {
                                 || (beforeDate == null && afterDate.isBefore(published))
                                 || (afterDate == null && beforeDate.isAfter(published))
                                 || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
-                            this.datumQueue.put(new StreamsDatum(MAPPER.writeValueAsString(activity), activity.getId()));
+                            String json = MAPPER.writeValueAsString(activity);
+                            this.datumQueue.put(new StreamsDatum(json, activity.getId()));
                         } else if(afterDate != null && afterDate.isAfter(published)) {
                             feed.setNextPageToken(null); // do not fetch next page
                             break;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
index c890cc1..172c649 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
@@ -99,7 +99,6 @@ public class GPlusUserActivityProvider extends AbstractGPlusProvider{
                     json = (String) datum.getDocument();
                 else
                     json = gson.toJson(datum.getDocument());
-                json = gson.toJson(datum.getDocument());
                 outStream.println(json);
             }
         } while( provider.isRunning());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
index 90efd99..78a1649 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
@@ -78,7 +78,8 @@ public  class GPlusUserDataCollector extends GPlusDataCollector {
                 }
                 ++attempts;
             } while(tryAgain && attempts < MAX_ATTEMPTS);
-            this.datumQueue.put(new StreamsDatum(MAPPER.writeValueAsString(person), person.getId()));
+            String json = MAPPER.writeValueAsString(person);
+            this.datumQueue.put(new StreamsDatum(json, person.getId()));
         } catch (Throwable t) {
             LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), t);
             if(t instanceof InterruptedException) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
index e264318..d31d764 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -103,7 +103,6 @@ public class GPlusUserDataProvider extends AbstractGPlusProvider{
                     json = (String) datum.getDocument();
                 else
                     json = gson.toJson(datum.getDocument());
-                json = gson.toJson(datum.getDocument());
                 outStream.println(json);
             }
         } while( provider.isRunning());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
index 574dcc1..f4b7a66 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java
@@ -46,7 +46,7 @@ public class GPlusUserDataProviderIT {
 
         while(outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() > 1);
+        assert (outCounter.getLineNumber() >= 1);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
index 6c8ecc1..0dbd065 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf
@@ -14,7 +14,7 @@
 # specific language governing permissions and limitations
 # under the License.
 gplus {
-  gplusUsers = [
+  googlePlusUsers = [
     {
       userId = "+apache"
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3f80b0c1/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
index 6c8ecc1..0dbd065 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf
@@ -14,7 +14,7 @@
 # specific language governing permissions and limitations
 # under the License.
 gplus {
-  gplusUsers = [
+  googlePlusUsers = [
     {
       userId = "+apache"
     }