You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2017/02/08 04:32:53 UTC

[7/9] incubator-streams git commit: STREAMS-463: Move every class in all repos underneath org.apache.streams, this closes apache/incubator-streams#356

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/AbstractGPlusProvider.java
new file mode 100644
index 0000000..7c07c70
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/AbstractGPlusProvider.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.plus.Plus;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.Gson;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provider that creates a GPlus client and will run task that queue data to an outing queue.
+ */
+public abstract class AbstractGPlusProvider implements StreamsProvider {
+
+  public static final String STREAMS_ID = "AbstractGPlusProvider";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class);
+  private static final Set<String> SCOPE = new HashSet<String>() {
+    {
+      add("https://www.googleapis.com/auth/plus.login");
+    }
+  };
+  private static final int MAX_BATCH_SIZE = 1000;
+
+  private static final HttpTransport TRANSPORT = new NetHttpTransport();
+  private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
+  private static final Gson GSON = new Gson();
+
+  private GPlusConfiguration config;
+
+  List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+  private ListeningExecutorService executor;
+
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private AtomicBoolean isComplete;
+  private boolean previousPullWasEmpty;
+
+  protected GoogleCredential credential;
+  protected Plus plus;
+
+  public AbstractGPlusProvider() {
+    this.config = new ComponentConfigurator<>(GPlusConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus"));
+  }
+
+  public AbstractGPlusProvider(GPlusConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    Objects.requireNonNull(config.getOauth().getPathToP12KeyFile());
+    Objects.requireNonNull(config.getOauth().getAppName());
+    Objects.requireNonNull(config.getOauth().getServiceAccountEmailAddress());
+
+    try {
+      this.plus = createPlusClient();
+    } catch (IOException | GeneralSecurityException ex) {
+      LOGGER.error("Failed to created oauth for GPlus : {}", ex);
+      throw new RuntimeException(ex);
+    }
+    // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one
+    // collector unless you have multiple oauth tokens
+    //TODO make this configurable based on the number of oauth tokens
+    this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+    this.datumQueue = new LinkedBlockingQueue<>(1000);
+    this.isComplete = new AtomicBoolean(false);
+    this.previousPullWasEmpty = false;
+  }
+
+  @Override
+  public void startStream() {
+
+    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
+    for (UserInfo user : this.config.getGooglePlusUsers()) {
+      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) {
+        user.setAfterDate(this.config.getDefaultAfterDate());
+      }
+      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) {
+        user.setBeforeDate(this.config.getDefaultBeforeDate());
+      }
+      this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user));
+    }
+    this.executor.shutdown();
+  }
+
+  protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo);
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
+    int batchCount = 0;
+    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
+      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
+      if (datum != null) {
+        ++batchCount;
+        ComponentUtils.offerUntilSuccess(datum, batch);
+      }
+    }
+    boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated();
+    this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
+    this.previousPullWasEmpty = pullIsEmpty;
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @VisibleForTesting
+  protected Plus createPlusClient() throws IOException, GeneralSecurityException {
+    credential = new GoogleCredential.Builder()
+        .setJsonFactory(JSON_FACTORY)
+        .setTransport(TRANSPORT)
+        .setServiceAccountScopes(SCOPE)
+        .setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress())
+        .setServiceAccountPrivateKeyFromP12File(new File(this.config.getOauth().getPathToP12KeyFile()))
+        .build();
+    return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).setApplicationName(this.config.getOauth().getAppName()).build();
+  }
+
+  @Override
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+    this.executor = null;
+  }
+
+  public GPlusConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(GPlusConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Set and overwrite the default before date that was read from the configuration file.
+   * @param defaultBeforeDate defaultBeforeDate
+   */
+  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
+    this.config.setDefaultBeforeDate(defaultBeforeDate);
+  }
+
+  /**
+   * Set and overwrite the default after date that was read from teh configuration file.
+   * @param defaultAfterDate defaultAfterDate
+   */
+  public void setDefaultAfterDate(DateTime defaultAfterDate) {
+    this.config.setDefaultAfterDate(defaultAfterDate);
+  }
+
+  /**
+   * Sets and overwrite the user info from the configuaration file.  Uses the defaults before and after dates.
+   * @param userIds userIds
+   */
+  public void setUserInfoWithDefaultDates(Set<String> userIds) {
+    List<UserInfo> gplusUsers = new LinkedList<>();
+    for (String userId : userIds) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(this.config.getDefaultAfterDate());
+      user.setBeforeDate(this.config.getDefaultBeforeDate());
+      gplusUsers.add(user);
+    }
+    this.config.setGooglePlusUsers(gplusUsers);
+  }
+
+  /**
+   * Set and overwrite user into from the configuration file. Only sets after date.
+   * @param usersAndAfterDates usersAndAfterDates
+   */
+  public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) {
+    List<UserInfo> gplusUsers = new LinkedList<>();
+    for (String userId : usersAndAfterDates.keySet()) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(usersAndAfterDates.get(userId));
+      gplusUsers.add(user);
+    }
+    this.config.setGooglePlusUsers(gplusUsers);
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isComplete.set(true);
+      LOGGER.info("Exiting");
+    }
+    return !isComplete.get();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusActivitySerializer.java
new file mode 100644
index 0000000..74b0d8a
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusActivitySerializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * GPlusActivitySerializer converts gplus activities to as1 activities.
+ */
+public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
+
+  AbstractGPlusProvider provider;
+
+  public GPlusActivitySerializer(AbstractGPlusProvider provider) {
+
+    this.provider = provider;
+  }
+
+  public GPlusActivitySerializer() {
+  }
+
+  @Override
+  public String serializationFormat() {
+    return "gplus.v1";
+  }
+
+  @Override
+  public com.google.api.services.plus.model.Activity serialize(Activity deserialized) {
+    throw new NotImplementedException("Not currently implemented");
+  }
+
+  @Override
+  public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) {
+    Activity activity = new Activity();
+
+    GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
+    return activity;
+  }
+
+  @Override
+  public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) {
+    throw new NotImplementedException("Not currently implemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusDataCollector.java
new file mode 100644
index 0000000..3726bc5
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusDataCollector.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GPlusDataCollector collects GPlus Data on behalf of providers.
+ */
+public abstract class GPlusDataCollector implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusDataCollector.class);
+
+  /**
+   * Looks at the status code of the exception.  If the code indicates that the request should be retried,
+   * it executes the back off strategy and returns true.
+   * @param gjre GoogleJsonResponseException
+   * @param backOff BackOffStrategy
+   * @return returns true if the error code of the exception indicates the request should be retried.
+   */
+  public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException {
+    boolean tryAgain = false;
+    switch (gjre.getStatusCode()) {
+      case 400 :
+        LOGGER.warn("Bad Request  : {}",  gjre);
+        break;
+      case 401 :
+        LOGGER.warn("Invalid Credentials : {}", gjre);
+        break;
+      case 403 :
+        LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage());
+        backOff.backOff();
+        tryAgain = true;
+        break;
+      case 503 :
+        LOGGER.warn("Google Backend Service Error : {}", gjre);
+        break;
+      default:
+        LOGGER.warn("Google Service returned error : {}", gjre);
+        tryAgain = true;
+        backOff.backOff();
+        break;
+    }
+    return tryAgain;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java
new file mode 100644
index 0000000..e253a18
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.ActivityFeed;
+import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Collects the public activities of a GPlus user. Has ability to filter by date ranges.
+ */
+public class GPlusUserActivityCollector extends GPlusDataCollector {
+
+  /**
+   * Key for all public activities
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final String PUBLIC_COLLECTION = "public";
+  /**
+   * Max results allowed per request
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final long MAX_RESULTS = 100;
+  private static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class);
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  static { //set up mapper for Google Activity Object
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
+    simpleModule.addSerializer(
+        com.google.api.client.util.DateTime.class,
+        new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) {
+          @Override
+          public void serialize(
+              com.google.api.client.util.DateTime dateTime,
+              JsonGenerator jsonGenerator,
+              SerializerProvider serializerProvider)
+              throws IOException {
+            jsonGenerator.writeString(dateTime.toStringRfc3339());
+          }
+        });
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private BackOffStrategy backOff;
+  private Plus plus;
+  private UserInfo userInfo;
+
+  /**
+   * GPlusUserActivityCollector constructor.
+   * @param plus Plus
+   * @param datumQueue BlockingQueue<StreamsDatum>
+   * @param backOff BackOffStrategy
+   * @param userInfo UserInfo
+   */
+  public GPlusUserActivityCollector(Plus plus, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo) {
+    this.plus = plus;
+    this.datumQueue = datumQueue;
+    this.backOff = backOff;
+    this.userInfo = userInfo;
+  }
+
+  @Override
+  public void run() {
+    collectActivityData();
+  }
+
+  protected void collectActivityData() {
+    try {
+      ActivityFeed feed = null;
+      boolean tryAgain = false;
+      int attempt = 0;
+      DateTime afterDate = userInfo.getAfterDate();
+      DateTime beforeDate = userInfo.getBeforeDate();
+      do {
+        try {
+          if (feed == null) {
+            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
+                .setMaxResults(MAX_RESULTS).execute();
+          } else {
+            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
+                .setMaxResults(MAX_RESULTS)
+                .setPageToken(feed.getNextPageToken()).execute();
+          }
+          this.backOff.reset(); //successful pull reset api.
+          for (com.google.api.services.plus.model.Activity activity : feed.getItems()) {
+            DateTime published = new DateTime(activity.getPublished().getValue());
+            if ((afterDate == null && beforeDate == null)
+                || (beforeDate == null && afterDate.isBefore(published))
+                || (afterDate == null && beforeDate.isAfter(published))
+                || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
+              String json = MAPPER.writeValueAsString(activity);
+              this.datumQueue.put(new StreamsDatum(json, activity.getId()));
+            } else if (afterDate != null && afterDate.isAfter(published)) {
+              feed.setNextPageToken(null); // do not fetch next page
+              break;
+            }
+          }
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
+          ++attempt;
+        }
+      }
+      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS);
+    } catch (Throwable th) {
+      if (th instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      th.printStackTrace();
+      LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityProvider.java
new file mode 100644
index 0000000..679660e
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityProvider.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *  Retrieve recent activity from a list of accounts.
+ *
+ *  <p/>
+ *  To use from command line:
+ *
+ *  <p/>
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  <p/>
+ *  gplus.oauth.pathToP12KeyFile
+ *  gplus.oauth.serviceAccountEmailAddress
+ *  gplus.apiKey
+ *  gplus.googlePlusUsers
+ *
+ *  <p/>
+ *  Launch using:
+ *
+ *  <p/>
+ *  mvn exec:java -Dexec.mainClass=org.apache.streams.gplus.provider.GPlusUserActivityProvider -Dexec.args="application.conf activity.json"
+ */
+public class GPlusUserActivityProvider extends AbstractGPlusProvider {
+
+  private static final String STREAMS_ID = "GPlusUserActivityProvider";
+
+  public GPlusUserActivityProvider() {
+    super();
+  }
+
+  public GPlusUserActivityProvider(GPlusConfiguration config) {
+    super(config);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+    return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
+  }
+
+  /**
+   * Retrieve recent activity from a list of accounts.
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
+    GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config);
+
+    Gson gson = new Gson();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      for (StreamsDatum datum : provider.readCurrent()) {
+        String json;
+        if (datum.getDocument() instanceof String) {
+          json = (String) datum.getDocument();
+        } else {
+          json = gson.toJson(datum.getDocument());
+        }
+        outStream.println(json);
+      }
+    }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataCollector.java
new file mode 100644
index 0000000..2cbbc8c
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataCollector.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Person;
+import org.apache.streams.gplus.serializer.util.GPlusPersonDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Collects user profile information for a specific GPlus user.
+ */
+public  class GPlusUserDataCollector extends GPlusDataCollector {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class);
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static final int MAX_ATTEMPTS = 5;
+
+  static { //set up Mapper for Person objects
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private BackOffStrategy backOffStrategy;
+  private Plus plus;
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private UserInfo userInfo;
+
+  /**
+   * GPlusUserDataCollector constructor.
+   * @param plus Plus
+   * @param backOffStrategy BackOffStrategy
+   * @param datumQueue BlockingQueue of StreamsDatum
+   * @param userInfo UserInfo
+   */
+  public GPlusUserDataCollector(Plus plus, BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) {
+    this.plus = plus;
+    this.backOffStrategy = backOffStrategy;
+    this.datumQueue = datumQueue;
+    this.userInfo = userInfo;
+  }
+
+  protected void queueUserHistory() {
+    try {
+      boolean tryAgain;
+      int attempts = 0;
+      com.google.api.services.plus.model.Person person = null;
+      do {
+        try {
+          person = this.plus.people().get(userInfo.getUserId()).execute();
+          this.backOffStrategy.reset();
+          tryAgain = person == null;
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy);
+        }
+        ++attempts;
+      }
+      while (tryAgain && attempts < MAX_ATTEMPTS);
+      String json = MAPPER.writeValueAsString(person);
+      this.datumQueue.put(new StreamsDatum(json, person.getId()));
+    } catch (Throwable throwable) {
+      LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), throwable);
+      if (throwable instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    queueUserHistory();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataProvider.java
new file mode 100644
index 0000000..14bf472
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserDataProvider.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.services.plus.Plus;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ *  Retrieve current profile status for a list of accounts.
+ *
+ *  <p/>
+ *  To use from command line:
+ *
+ *  <p/>
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  <p/>
+ *  gplus.oauth.pathToP12KeyFile
+ *  gplus.oauth.serviceAccountEmailAddress
+ *  gplus.apiKey
+ *  gplus.googlePlusUsers
+ *
+ *  <p/>
+ *  Launch using:
+ *
+ *  <p/>
+ *  mvn exec:java -Dexec.mainClass=org.apache.streams.gplus.provider.GPlusUserDataProvider -Dexec.args="application.conf profiles.json"
+ */
+public class GPlusUserDataProvider extends AbstractGPlusProvider {
+
+  public static final String STREAMS_ID = "GPlusUserDataProvider";
+
+  public GPlusUserDataProvider() {
+    super();
+  }
+
+  public GPlusUserDataProvider(GPlusConfiguration config) {
+    super(config);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+    return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
+  }
+
+  /**
+   * Retrieve current profile status for a list of accounts.
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    MatcherAssert.assertThat(args.length, Matchers.greaterThanOrEqualTo(2));
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus");
+    GPlusUserDataProvider provider = new GPlusUserDataProvider(config);
+
+    Gson gson = new Gson();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      for (StreamsDatum datum : provider.readCurrent()) {
+        String json;
+        if (datum.getDocument() instanceof String) {
+          json = (String) datum.getDocument();
+        } else {
+          json = gson.toJson(datum.getDocument());
+        }
+        outStream.println(json);
+      }
+    }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusActivityDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusActivityDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusActivityDeserializer.java
new file mode 100644
index 0000000..585e511
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusActivityDeserializer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.serializer.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.plus.model.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Custom deserializer for GooglePlus' Person model.
+ */
+public class GPlusActivityDeserializer extends JsonDeserializer<Activity> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class);
+
+  /**
+   * Because the GooglePlus Activity object {@link com.google.api.services.plus.model.Activity} contains complex objects
+   * within its hierarchy, we have to use a custom deserializer
+   *
+   * @param jsonParser jsonParser
+   * @param deserializationContext deserializationContext
+   * @return The deserialized {@link com.google.api.services.plus.model.Activity} object
+   * @throws IOException IOException
+   * @throws JsonProcessingException JsonProcessingException
+   */
+  @Override
+  public Activity deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+
+    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+    Activity activity = new Activity();
+
+    try {
+      activity.setUrl(node.get("url").asText());
+      activity.setEtag(node.get("etag").asText());
+      activity.setTitle(node.get("title").asText());
+      activity.setPublished(DateTime.parseRfc3339(node.get("published").asText()));
+      activity.setUpdated(DateTime.parseRfc3339(node.get("updated").asText()));
+      activity.setId(node.get("id").asText());
+      activity.setVerb(node.get("verb").asText());
+
+      activity.setActor(buildActor(node));
+
+      activity.setObject(buildPlusObject(node));
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize activity object: {}", ex);
+    }
+
+    return activity;
+  }
+
+  /**
+   * Given a raw JsonNode, build out the G+ {@link com.google.api.services.plus.model.Activity.Actor} object
+   *
+   * @param node node
+   * @return {@link com.google.api.services.plus.model.Activity.Actor} object
+   */
+  private Activity.Actor buildActor(JsonNode node) {
+    Activity.Actor actor = new Activity.Actor();
+    JsonNode actorNode = node.get("actor");
+
+    actor.setId(actorNode.get("id").asText());
+    actor.setDisplayName(actorNode.get("displayName").asText());
+    actor.setUrl(actorNode.get("url").asText());
+
+    Activity.Actor.Image image = new Activity.Actor.Image();
+    JsonNode imageNode = actorNode.get("image");
+    image.setUrl(imageNode.get("url").asText());
+
+    actor.setImage(image);
+
+    return actor;
+  }
+
+  /**
+   * Given a JsonNode, build out all aspects of the {@link com.google.api.services.plus.model.Activity.PlusObject} object
+   *
+   * @param node node
+   * @return {@link com.google.api.services.plus.model.Activity.PlusObject} object
+   */
+  private Activity.PlusObject buildPlusObject(JsonNode node) {
+    Activity.PlusObject object = new Activity.PlusObject();
+    JsonNode objectNode = node.get("object");
+    object.setObjectType(objectNode.get("objectType").asText());
+    object.setContent(objectNode.get("content").asText());
+    object.setUrl(objectNode.get("url").asText());
+
+    Activity.PlusObject.Replies replies = new Activity.PlusObject.Replies();
+    JsonNode repliesNode = objectNode.get("replies");
+    replies.setTotalItems(repliesNode.get("totalItems").asLong());
+    replies.setSelfLink(repliesNode.get("selfLink").asText());
+    object.setReplies(replies);
+
+    Activity.PlusObject.Plusoners plusoners = new Activity.PlusObject.Plusoners();
+    JsonNode plusonersNode = objectNode.get("plusoners");
+    plusoners.setTotalItems(plusonersNode.get("totalItems").asLong());
+    plusoners.setSelfLink(plusonersNode.get("selfLink").asText());
+    object.setPlusoners(plusoners);
+
+    Activity.PlusObject.Resharers resharers = new Activity.PlusObject.Resharers();
+    JsonNode resharersNode = objectNode.get("resharers");
+    resharers.setTotalItems(resharersNode.get("totalItems").asLong());
+    resharers.setSelfLink(resharersNode.get("selfLink").asText());
+    object.setResharers(resharers);
+
+    object.setAttachments(buildAttachments(objectNode));//attachments);
+
+    return object;
+  }
+
+  /**
+   * Given a raw JsonNode representation of an Activity's attachments, build out that
+   * list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects
+   *
+   * @param objectNode objectNode
+   * @return list of {@link com.google.api.services.plus.model.Activity.PlusObject.Attachments} objects
+   */
+  private List<Activity.PlusObject.Attachments> buildAttachments(JsonNode objectNode) {
+    List<Activity.PlusObject.Attachments> attachments = new ArrayList<>();
+    if ( objectNode.has("attachments") ) {
+      for (JsonNode attachmentNode : objectNode.get("attachments")) {
+        Activity.PlusObject.Attachments attachments1 = new Activity.PlusObject.Attachments();
+        attachments1.setObjectType(attachmentNode.get("objectType").asText());
+        if (attachmentNode.has("displayName")) {
+          attachments1.setDisplayName(attachmentNode.get("displayName").asText());
+        }
+        if (attachmentNode.has("content")) {
+          attachments1.setContent(attachmentNode.get("content").asText());
+        }
+        if (attachmentNode.has("url")) {
+          attachments1.setUrl(attachmentNode.get("url").asText());
+        }
+
+        if( attachmentNode.has("image")) {
+          Activity.PlusObject.Attachments.Image image1 = new Activity.PlusObject.Attachments.Image();
+          JsonNode imageNode1 = attachmentNode.get("image");
+          image1.setUrl(imageNode1.get("url").asText());
+          attachments1.setImage(image1);
+        }
+
+        attachments.add(attachments1);
+      }
+    }
+    return attachments;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusCommentDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusCommentDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusCommentDeserializer.java
new file mode 100644
index 0000000..5d6a982
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusCommentDeserializer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.serializer.util;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.util.DateTime;
+import com.google.api.services.plus.model.Comment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * GPlusCommentDeserializer converts gplus comments to as1 comments.
+ */
+public class GPlusCommentDeserializer extends JsonDeserializer<Comment> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivityDeserializer.class);
+
+  /**
+   * Because the GooglePlus Comment object {@link com.google.api.services.plus.model.Comment} contains complex objects
+   * within its hierarchy, we have to use a custom deserializer
+   *
+   * @param jsonParser jsonParser
+   * @param deserializationContext deserializationContext
+   * @return The deserialized {@link com.google.api.services.plus.model.Comment} object
+   * @throws java.io.IOException IOException
+   * @throws com.fasterxml.jackson.core.JsonProcessingException JsonProcessingException
+   */
+  @Override
+  public Comment deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+      throws IOException {
+
+    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+    ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
+    Comment comment = new Comment();
+
+    try {
+      comment.setEtag(node.get("etag").asText());
+      comment.setVerb(node.get("verb").asText());
+      comment.setId(node.get("id").asText());
+      comment.setPublished(DateTime.parseRfc3339(node.get("published").asText()));
+      comment.setUpdated(DateTime.parseRfc3339(node.get("updated").asText()));
+
+      Comment.Actor actor = new Comment.Actor();
+      JsonNode actorNode = node.get("actor");
+      actor.setDisplayName(actorNode.get("displayName").asText());
+      actor.setUrl(actorNode.get("url").asText());
+
+      Comment.Actor.Image image = new Comment.Actor.Image();
+      JsonNode imageNode = actorNode.get("image");
+      image.setUrl(imageNode.get("url").asText());
+
+      actor.setImage(image);
+
+      comment.setObject(objectMapper.readValue(objectMapper.writeValueAsString(node.get("object")), Comment.PlusObject.class));
+
+      comment.setSelfLink(node.get("selfLink").asText());
+
+      List<Comment.InReplyTo> replies = new ArrayList<>();
+      for (JsonNode reply : node.get("inReplyTo")) {
+        Comment.InReplyTo irt = objectMapper.readValue(objectMapper.writeValueAsString(reply), Comment.InReplyTo.class);
+        replies.add(irt);
+      }
+
+      comment.setInReplyTo(replies);
+
+      Comment.Plusoners plusoners = new Comment.Plusoners();
+      JsonNode plusonersNode = node.get("plusoners");
+      plusoners.setTotalItems(plusonersNode.get("totalItems").asLong());
+      comment.setPlusoners(plusoners);
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize activity object: {}", ex);
+    }
+
+    return comment;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifier.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifier.java
new file mode 100644
index 0000000..f6fa524
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusEventClassifier.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.serializer.util;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.Person;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * GPlusEventClassifier classifies GPlus Events.
+ */
+public class GPlusEventClassifier implements Serializable {
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static final String ACTIVITY_IDENTIFIER = "\"plus#activity\"";
+  private static final String PERSON_IDENTIFIER = "\"plus#person\"";
+
+  /**
+   * Detect likely class of String json.
+   * @param json String json
+   * @return likely class
+   */
+  public static Class detectClass(String json) {
+    Objects.requireNonNull(json);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+    ObjectNode objectNode;
+    try {
+      objectNode = (ObjectNode) mapper.readTree(json);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return null;
+    }
+
+    if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(ACTIVITY_IDENTIFIER)) {
+      return Activity.class;
+    } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(PERSON_IDENTIFIER)) {
+      return Person.class;
+    } else  {
+      return ObjectNode.class;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusPersonDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusPersonDeserializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusPersonDeserializer.java
new file mode 100644
index 0000000..8f7ae72
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GPlusPersonDeserializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.serializer.util;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.plus.model.Person;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Custom deserializer for GooglePlus' Person model.
+ */
+public class GPlusPersonDeserializer extends JsonDeserializer<Person> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GPlusPersonDeserializer.class);
+
+  /**
+   * Because the GooglePlus Person object contains complex objects within its hierarchy, we have to use
+   * a custom deserializer
+   *
+   * @param jsonParser jsonParser
+   * @param deserializationContext deserializationContext
+   * @return The deserialized {@link com.google.api.services.plus.model.Person} object
+   * @throws IOException IOException
+   * @throws JsonProcessingException JsonProcessingException
+   */
+  @Override
+  public Person deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+    Person person = new Person();
+    try {
+      person.setId(node.get("id").asText());
+      person.setCircledByCount((Integer) (node.get("circledByCount")).numberValue());
+      person.setDisplayName(node.get("displayName").asText());
+      if( node.has("etag")) {
+        person.setEtag(node.get("etag").asText());
+      }
+      if( node.has("gender")) {
+        person.setGender(node.get("gender").asText());
+      }
+
+      Person.Image image = new Person.Image();
+      if( node.has("image") ) {
+        JsonNode imageNode = node.get("image");
+        image.setIsDefault(imageNode.get("isDefault").asBoolean());
+        image.setUrl(imageNode.get("url").asText());
+        person.setImage(image);
+      }
+
+      person.setIsPlusUser(node.get("isPlusUser").asBoolean());
+      person.setKind(node.get("kind").asText());
+
+      JsonNode nameNode = node.get("name");
+      Person.Name name = mapper.readValue(mapper.writeValueAsString(nameNode), Person.Name.class);
+      person.setName(name);
+
+      person.setObjectType(node.get("objectType").asText());
+
+      List<Person.Organizations> organizations = new ArrayList<>();
+      if( node.has("organizations")) {
+        for (JsonNode orgNode : node.get("organizations")) {
+          Person.Organizations org = mapper.readValue(mapper.writeValueAsString(orgNode), Person.Organizations.class);
+          organizations.add(org);
+        }
+        person.setOrganizations(organizations);
+      }
+
+      person.setUrl(node.get("url").asText());
+      person.setVerified(node.get("verified").asBoolean());
+
+      List<Person.Emails> emails = new ArrayList<>();
+
+      if ( node.has("emails")) {
+        for (JsonNode emailNode : node.get("emails")) {
+          Person.Emails email = mapper.readValue(mapper.writeValueAsString(emailNode), Person.Emails.class);
+          emails.add(email);
+        }
+      }
+
+      if ( node.has("tagline")) {
+        person.setTagline(node.get("tagline").asText());
+      }
+      if ( node.has("aboutMe")) {
+        person.setAboutMe(node.get("aboutMe").asText());
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize a Person object: {}", ex);
+    }
+
+    return person;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GooglePlusActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GooglePlusActivityUtil.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GooglePlusActivityUtil.java
new file mode 100644
index 0000000..7039266
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/serializer/util/GooglePlusActivityUtil.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.serializer.util;
+
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+
+import com.google.api.services.plus.model.Comment;
+import com.google.api.services.plus.model.Person;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * GooglePlusActivityUtil helps convert c.g.Person and c.g.Activity into o.a.s.p.j.o.Page and o.a.s.p.j.Activity.
+ */
+public class GooglePlusActivityUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusActivityUtil.class);
+
+  /**
+   * Given a {@link Person} object and an
+   * {@link Activity} object, fill out the appropriate details.
+   *
+   * @param item Person
+   * @param activity Activity
+   * @throws ActivitySerializerException ActivitySerializerException
+   */
+  public static void updateActivity(Person item, Activity activity) throws ActivitySerializerException {
+    activity.setActor(buildActor(item));
+    activity.setVerb("update");
+
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(item.getId()).orElse(null)));
+
+    activity.setProvider(getProvider());
+  }
+
+  /**
+   * Given a {@link List} of {@link Comment} objects and an
+   * {@link Activity}, update that Activity to contain all comments
+   *
+   * @param comments input List of Comment
+   * @param activity output Activity
+   */
+  public static void updateActivity(List<Comment> comments, Activity activity) {
+    for (Comment comment : comments) {
+      addComment(activity, comment);
+    }
+
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+    extensions.put("comment_count", comments.size());
+  }
+
+  /**
+   * Given a Google Plus {@link com.google.api.services.plus.model.Activity},
+   * convert that into an Activity streams formatted {@link Activity}
+   *
+   * @param gPlusActivity input c.g.a.s.p.m.Activity
+   * @param activity output o.a.s.p.j.Activity
+   */
+  public static void updateActivity(com.google.api.services.plus.model.Activity gPlusActivity, Activity activity) {
+    activity.setActor(buildActor(gPlusActivity.getActor()));
+    activity.setVerb("post");
+    activity.setTitle(gPlusActivity.getTitle());
+    activity.setUrl(gPlusActivity.getUrl());
+    activity.setProvider(getProvider());
+
+    if (gPlusActivity.getObject() != null) {
+      activity.setContent(gPlusActivity.getObject().getContent());
+    }
+
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(gPlusActivity.getId()).orElse(null)));
+
+    DateTime published = new DateTime(String.valueOf(gPlusActivity.getPublished()));
+    activity.setPublished(published);
+
+    setObject(activity, gPlusActivity.getObject());
+    addGPlusExtensions(activity, gPlusActivity);
+  }
+
+  /**
+   * Adds a single {@link Comment} to the Object.Attachments
+   * section of the passed in {@link Activity}
+   *
+   * @param activity output o.a.s.p.j.Activity
+   * @param comment input c.g.a.s.p.m.Comment
+   */
+  private static void addComment(Activity activity, Comment comment) {
+    ActivityObject obj = new ActivityObject();
+
+    obj.setId(comment.getId());
+    obj.setPublished(new DateTime(String.valueOf(comment.getPublished())));
+    obj.setUpdated(new DateTime(String.valueOf(comment.getUpdated())));
+    obj.setContent(comment.getObject().getContent());
+    obj.setObjectType(comment.getObject().getObjectType());
+
+    Map<String, Object> extensions = new HashMap<>();
+    extensions.put("googlePlus", comment);
+
+    obj.setAdditionalProperty("extensions", extensions);
+
+    if (activity.getObject() == null) {
+      activity.setObject(new ActivityObject());
+    }
+    if (activity.getObject().getAttachments() == null) {
+      activity.getObject().setAttachments(new ArrayList<>());
+    }
+
+    activity.getObject().getAttachments().add(obj);
+  }
+
+  /**
+   * Add in necessary extensions from the passed in {@link com.google.api.services.plus.model.Activity} to the
+   * {@link Activity} object
+   *
+   * @param activity output o.a.s.p.j.Activity
+   * @param gPlusActivity input c.g.a.s.p.m.Activity
+   */
+  private static void addGPlusExtensions(Activity activity, com.google.api.services.plus.model.Activity gPlusActivity) {
+
+    activity.getAdditionalProperties().put("googlePlus", gPlusActivity);
+
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+
+    com.google.api.services.plus.model.Activity.PlusObject object = gPlusActivity.getObject();
+
+    if (object != null) {
+      com.google.api.services.plus.model.Activity.PlusObject.Plusoners plusoners = object.getPlusoners();
+      if (plusoners != null) {
+        Map<String, Object> likes = new HashMap<>();
+        likes.put("count", plusoners.getTotalItems());
+        extensions.put("likes", likes);
+      }
+
+      com.google.api.services.plus.model.Activity.PlusObject.Resharers resharers = object.getResharers();
+      if (resharers != null) {
+        Map<String, Object> rebroadcasts = new HashMap<>();
+        rebroadcasts.put("count", resharers.getTotalItems());
+        extensions.put("rebroadcasts", rebroadcasts);
+      }
+
+      extensions.put("keywords", object.getContent());
+    }
+  }
+
+  /**
+   * Set the {@link ActivityObject} field given the passed in
+   * {@link com.google.api.services.plus.model.Activity.PlusObject}
+   *
+   * @param activity output $.object as o.a.s.p.j.ActivityObject
+   * @param plusObject input c.g.a.s.p.m.Activity.PlusObject
+   */
+  private static void setObject(Activity activity, com.google.api.services.plus.model.Activity.PlusObject plusObject) {
+    if (plusObject != null) {
+      ActivityObject activityObject = new ActivityObject();
+
+      activityObject.setContent(plusObject.getContent());
+      activityObject.setObjectType(plusObject.getObjectType());
+
+      List<ActivityObject> attachmentsList = new ArrayList<>();
+      for (com.google.api.services.plus.model.Activity.PlusObject.Attachments attachments : plusObject.getAttachments()) {
+        ActivityObject attach = new ActivityObject();
+
+        attach.setContent(attachments.getContent());
+        attach.setDisplayName(attachments.getDisplayName());
+        attach.setObjectType(attachments.getObjectType());
+        attach.setUrl(attachments.getUrl());
+
+        Image image = new Image();
+        com.google.api.services.plus.model.Activity.PlusObject.Attachments.Image image1 = attachments.getImage();
+
+        if (image1 != null) {
+          image.setUrl(image1.getUrl());
+          attach.setImage(image);
+        }
+
+        attachmentsList.add(attach);
+      }
+
+      activityObject.setAttachments(attachmentsList);
+
+      activity.setObject(activityObject);
+    }
+  }
+
+  /**
+   * Given a {@link com.google.api.services.plus.model.Activity.Actor} object, return a fully fleshed
+   * out {@link ActivityObject} actor
+   *
+   * @param gPlusActor input c.g.a.s.p.m.Activity.Actor
+   * @return {@link ActivityObject} output $.actor as o.a.s.p.j.ActivityObject
+   */
+  private static ActivityObject buildActor(com.google.api.services.plus.model.Activity.Actor gPlusActor) {
+    ActivityObject actor = new ActivityObject();
+
+    actor.setDisplayName(gPlusActor.getDisplayName());
+    actor.setId(formatId(String.valueOf(gPlusActor.getId())));
+    actor.setUrl(gPlusActor.getUrl());
+
+    Image image = new Image();
+    com.google.api.services.plus.model.Activity.Actor.Image googlePlusImage = gPlusActor.getImage();
+
+    if (googlePlusImage != null) {
+      image.setUrl(googlePlusImage.getUrl());
+    }
+    actor.setImage(image);
+
+    return actor;
+  }
+
+  /**
+   * Extract the relevant details from the passed in {@link Person} object and build
+   * an actor with them
+   *
+   * @param person Person
+   * @return Actor constructed with relevant Person details
+   */
+  private static ActivityObject buildActor(Person person) {
+    ActivityObject actor = new ActivityObject();
+
+    actor.setUrl(person.getUrl());
+    actor.setDisplayName(person.getDisplayName());
+    actor.setId(formatId(String.valueOf(person.getId())));
+
+    if (person.getAboutMe() != null) {
+      actor.setSummary(person.getAboutMe());
+    } else if (person.getTagline() != null) {
+      actor.setSummary(person.getTagline());
+    }
+
+    Image image = new Image();
+    Person.Image googlePlusImage = person.getImage();
+
+    if (googlePlusImage != null) {
+      image.setUrl(googlePlusImage.getUrl());
+    }
+    actor.setImage(image);
+
+    Map<String, Object> extensions = new HashMap<>();
+
+    extensions.put("followers", person.getCircledByCount());
+    extensions.put("googleplus", person);
+    actor.setAdditionalProperty("extensions", extensions);
+
+    return actor;
+  }
+
+  /**
+   * Gets the common googleplus {@link Provider} object
+   * @return a provider object representing GooglePlus
+   */
+  public static Provider getProvider() {
+    Provider provider = new Provider();
+    provider.setId("id:providers:googleplus");
+    provider.setDisplayName("GooglePlus");
+    return provider;
+  }
+
+  /**
+   * Formats the ID to conform with the Apache Streams activity ID convention
+   * @param idparts the parts of the ID to join
+   * @return a valid Activity ID in format "id:googleplus:part1:part2:...partN"
+   */
+  public static String formatId(String... idparts) {
+    return String.join(":",
+        Stream.concat(Arrays.stream(new String[]{"id:googleplus"}), Arrays.stream(idparts)).collect(Collectors.toList()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
deleted file mode 100644
index 3b7795b..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
+++ /dev/null
@@ -1,101 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.google.gplus.GPlusConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port"
-        },
-        "version": {
-            "type": "string",
-            "description": "The version"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
-        "follow": {
-            "type": "array",
-            "description": "DEPRECATED. A list of user names, indicating the users whose activities should be delivered on the stream",
-            "items": {
-                "type": "string"
-            }
-        },
-        "googlePlusUsers": {
-            "type": "array",
-            "description": "A list of user user ids and optional date parameters for the GPlus provider",
-            "items": {
-                "type": "object",
-                "$ref": "#/definitions/userInfo"
-            }
-        },
-        "defaultAfterDate": {
-            "type": "string",
-            "format": "date-time",
-            "description": "Optional parameter for the provider. If this value is not null an the afterDate value in the userInfo is null, this value will be used."
-        },
-        "defaultBeforeDate": {
-            "type": "string",
-            "format": "date-time",
-            "description": "Optional parameter for the provider. If this value is not null and the beforeDate value in the userInfo is null, this value will be used."
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "description": "DEPRICATED",
-            "properties": {
-                "appName": {
-                    "type": "string"
-                },
-                "pathToP12KeyFile": {
-                    "type": "string",
-                    "description": "Absolute Path to key file"
-                },
-                "serviceAccountEmailAddress": {
-                    "type": "string",
-                    "description": "Service Account email address for your app"
-                }
-            }
-        }
-    },
-    "definitions": {
-        "userInfo": {
-            "type": "object",
-            "javaInterfaces" : ["java.io.Serializable"],
-            "dynamic": "true",
-            "javaType": "org.apache.streams.google.gplus.configuration.UserInfo",
-            "properties": {
-                "userId": {
-                    "type": "string",
-                    "description": "Google+ user id"
-                },
-                "afterDate": {
-                    "type": "string",
-                    "format": "date-time",
-                    "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
-                },
-                "beforeDate": {
-                    "type": "string",
-                    "format": "date-time",
-                    "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/org/apache/streams/google/gplus/GPlusConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/org/apache/streams/google/gplus/GPlusConfiguration.json b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/org/apache/streams/google/gplus/GPlusConfiguration.json
new file mode 100644
index 0000000..3b7795b
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/org/apache/streams/google/gplus/GPlusConfiguration.json
@@ -0,0 +1,101 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.google.gplus.GPlusConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "follow": {
+            "type": "array",
+            "description": "DEPRECATED. A list of user names, indicating the users whose activities should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        },
+        "googlePlusUsers": {
+            "type": "array",
+            "description": "A list of user user ids and optional date parameters for the GPlus provider",
+            "items": {
+                "type": "object",
+                "$ref": "#/definitions/userInfo"
+            }
+        },
+        "defaultAfterDate": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Optional parameter for the provider. If this value is not null an the afterDate value in the userInfo is null, this value will be used."
+        },
+        "defaultBeforeDate": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Optional parameter for the provider. If this value is not null and the beforeDate value in the userInfo is null, this value will be used."
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "description": "DEPRICATED",
+            "properties": {
+                "appName": {
+                    "type": "string"
+                },
+                "pathToP12KeyFile": {
+                    "type": "string",
+                    "description": "Absolute Path to key file"
+                },
+                "serviceAccountEmailAddress": {
+                    "type": "string",
+                    "description": "Service Account email address for your app"
+                }
+            }
+        }
+    },
+    "definitions": {
+        "userInfo": {
+            "type": "object",
+            "javaInterfaces" : ["java.io.Serializable"],
+            "dynamic": "true",
+            "javaType": "org.apache.streams.google.gplus.configuration.UserInfo",
+            "properties": {
+                "userId": {
+                    "type": "string",
+                    "description": "Google+ user id"
+                },
+                "afterDate": {
+                    "type": "string",
+                    "format": "date-time",
+                    "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
+                },
+                "beforeDate": {
+                    "type": "string",
+                    "format": "date-time",
+                    "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
deleted file mode 100644
index d223867..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusCommentSerDeIT.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.plus.model.Comment;
-import com.google.gplus.serializer.util.GPlusCommentDeserializer;
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-
-/**
- * Tests conversion of gplus inputs to Activity.
- */
-public class GooglePlusCommentSerDeIT {
-  private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentSerDeIT.class);
-  private ObjectMapper objectMapper;
-  private GooglePlusActivityUtil googlePlusActivityUtil;
-
-  /**
-   * setup.
-   */
-  @BeforeClass
-  public void setupTestCommentObjects() {
-    objectMapper = StreamsJacksonMapper.getInstance();
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Comment.class, new GPlusCommentDeserializer());
-    objectMapper.registerModule(simpleModule);
-    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
-    googlePlusActivityUtil = new GooglePlusActivityUtil();
-  }
-
-  @Test
-  public void testCommentObjects() {
-    InputStream is = GooglePlusCommentSerDeIT.class.getResourceAsStream("/google_plus_comments_jsons.txt");
-    InputStreamReader isr = new InputStreamReader(is);
-    BufferedReader br = new BufferedReader(isr);
-
-    Activity activity = new Activity();
-    List<Comment> comments = new ArrayList<>();
-
-    try {
-      while (br.ready()) {
-        String line = br.readLine();
-        if (!StringUtils.isEmpty(line)) {
-          LOGGER.info("raw: {}", line);
-          Comment comment = objectMapper.readValue(line, Comment.class);
-
-          LOGGER.info("comment: {}", comment);
-
-          assertNotNull(comment);
-          assertNotNull(comment.getEtag());
-          assertNotNull(comment.getId());
-          assertNotNull(comment.getInReplyTo());
-          assertNotNull(comment.getObject());
-          assertNotNull(comment.getPlusoners());
-          assertNotNull(comment.getPublished());
-          assertNotNull(comment.getUpdated());
-          assertNotNull(comment.getSelfLink());
-          assertEquals(comment.getVerb(), "post");
-
-          comments.add(comment);
-        }
-      }
-
-      assertEquals(comments.size(), 3);
-
-      GooglePlusActivityUtil.updateActivity(comments, activity);
-      assertNotNull(activity);
-      assertNotNull(activity.getObject());
-      assertEquals(activity.getObject().getAttachments().size(), 3);
-    } catch (Exception ex) {
-      LOGGER.error("Exception while testing serializability: {}", ex);
-    }
-  }
-
-  @Test
-  public void testEmptyComments() {
-    Activity activity = new Activity();
-
-    GooglePlusActivityUtil.updateActivity(new ArrayList<>(), activity);
-
-    assertNull(activity.getObject());
-  }
-}