You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/11/07 22:04:28 UTC

[2/5] incubator-streams git commit: Fixed Gplus provider and added user data/activity collectors

Fixed Gplus provider and added user data/activity collectors


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4e0e9dad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4e0e9dad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4e0e9dad

Branch: refs/heads/master
Commit: 4e0e9dad415d1767f0f6191377066be11bc645dc
Parents: 76763f8
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Thu Nov 6 17:08:34 2014 -0600
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Thu Nov 6 17:08:34 2014 -0600

----------------------------------------------------------------------
 .../google-gplus/pom.xml                        |  32 +++
 .../gplus/provider/AbstractGPlusProvider.java   | 234 ++++++++++++++++
 .../gplus/provider/GPlusActivitySerializer.java |   4 +-
 .../provider/GPlusHistoryProviderTask.java      | 106 --------
 .../google/gplus/provider/GPlusProvider.java    | 189 -------------
 .../provider/GPlusUserActivityCollector.java    | 130 +++++++++
 .../provider/GPlusUserActivityProvider.java     |  18 ++
 .../gplus/provider/GPlusUserDataCollector.java  | 101 +++++++
 .../gplus/provider/GPlusUserDataProvider.java   |  18 ++
 .../com/google/gplus/GPlusConfiguration.json    |  44 ++-
 .../provider/TestAbstractGPlusProvider.java     |  79 ++++++
 .../TestGPlusUserActivityCollector.java         | 268 +++++++++++++++++++
 .../provider/TestGPlusUserDataCollector.java    | 131 +++++++++
 streams-contrib/streams-provider-google/pom.xml |   6 +
 .../backoff/AbstractBackOffStrategy.java        |  15 +-
 15 files changed, 1071 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml
index 0437408..a734791 100644
--- a/streams-contrib/streams-provider-google/google-gplus/pom.xml
+++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml
@@ -79,6 +79,38 @@
             <artifactId>google-http-client-jackson2</artifactId>
             <version>1.17.0-rc</version>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.5.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.5.5</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.powermock</groupId>
+                    <artifactId>powermock-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.powermock</groupId>
+                    <artifactId>powermock-reflect</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
new file mode 100644
index 0000000..daa34d5
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gplus.provider;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.plus.Plus;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public abstract class AbstractGPlusProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class);
+    private final static String SCOPE = "https://www.googleapis.com/auth/plus.stream.read";
+    private final static int MAX_BATCH_SIZE = 1000;
+
+    private static final HttpTransport TRANSPORT = new NetHttpTransport();
+    private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
+    private static final Gson GSON = new Gson();
+
+    private GPlusConfiguration config;
+    private ExecutorService executor;
+    private BlockingQueue<StreamsDatum> datumQueue;
+    private BlockingQueue<Runnable> runnables;
+    private AtomicBoolean isComplete;
+    private boolean previousPullWasEmpty;
+
+    protected GoogleClientSecrets clientSecrets;
+    protected GoogleCredential credential;
+    protected Plus plus;
+
+
+
+    public AbstractGPlusProvider() {
+        Config config = StreamsConfigurator.config.getConfig("gplus");
+        this.config = GPlusConfigurator.detectConfiguration(config);
+    }
+
+    public AbstractGPlusProvider(GPlusConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public void startStream() {
+        BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
+        for(UserInfo user : this.config.getGooglePlusUsers()) {
+            if(this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) {
+                user.setAfterDate(this.config.getDefaultAfterDate());
+            }
+            if(this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) {
+                user.setBeforeDate(this.config.getDefaultBeforeDate());
+            }
+            this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user));
+        }
+        this.executor.shutdown();
+    }
+
+    protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo);
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
+        int batchCount = 0;
+        while(!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
+            StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
+            if(datum != null) {
+                ++batchCount;
+                ComponentUtils.offerUntilSuccess(datum, batch);
+            }
+        }
+        boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() &&this.executor.isTerminated();
+        this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
+        this.previousPullWasEmpty = pullIsEmpty;
+        return new StreamsResultSet(batch);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !this.isComplete.get();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+
+        try {
+            this.plus = createPlusClient();
+        } catch (IOException e) {
+            LOGGER.error("Failed to created oauth for GPlus : {}", e);
+            throw new RuntimeException(e);
+        }
+        // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one
+        // collector unless you have multiple oauth tokens
+        //TODO make this configurable based on the number of oauth tokens
+        this.executor = Executors.newFixedThreadPool(1);
+        this.datumQueue = new LinkedBlockingQueue<>(1000);
+        this.isComplete = new AtomicBoolean(false);
+        this.previousPullWasEmpty = false;
+    }
+
+    @VisibleForTesting
+    protected Plus createPlusClient() throws IOException{
+        credential = new GoogleCredential.Builder()
+                .setJsonFactory(JSON_FACTORY)
+                .setTransport(TRANSPORT)
+                .setClientSecrets(config.getOauth().getConsumerKey(), config.getOauth().getConsumerSecret()).build()
+                .setFromTokenResponse(JSON_FACTORY.fromString(
+                        config.getOauth().getAccessToken(), GoogleTokenResponse.class));
+        credential.refreshToken();
+        return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).build();
+    }
+
+    @Override
+    public void cleanUp() {
+        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+        this.executor = null;
+    }
+
+    public GPlusConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(GPlusConfiguration config) {
+        this.config = config;
+    }
+
+    /**
+     * Set and overwrite the default before date that was read from the configuration file.
+     * @param defaultBeforeDate
+     */
+    public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
+        this.config.setDefaultBeforeDate(defaultBeforeDate);
+    }
+
+    /**
+     * Set and overwrite the default after date that was read from teh configuration file.
+     * @param defaultAfterDate
+     */
+    public void setDefaultAfterDate(DateTime defaultAfterDate) {
+        this.config.setDefaultAfterDate(defaultAfterDate);
+    }
+
+    /**
+     * Sets and overwrite the user info from the configuaration file.  Uses the defaults before and after dates.
+     * @param userIds
+     */
+    public void setUserInfoWithDefaultDates(Set<String> userIds) {
+        List<UserInfo> gPlusUsers = Lists.newLinkedList();
+        for(String userId : userIds) {
+            UserInfo user = new UserInfo();
+            user.setUserId(userId);
+            user.setAfterDate(this.config.getDefaultAfterDate());
+            user.setBeforeDate(this.config.getDefaultBeforeDate());
+            gPlusUsers.add(user);
+        }
+        this.config.setGooglePlusUsers(gPlusUsers);
+    }
+
+    /**
+     * Set and overwrite user into from teh configuration file. Only sets after dater.
+     * @param usersAndAfterDates
+     */
+    public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) {
+        List<UserInfo> gPlusUsers = Lists.newLinkedList();
+        for(String userId : usersAndAfterDates.keySet()) {
+            UserInfo user = new UserInfo();
+            user.setUserId(userId);
+            user.setAfterDate(usersAndAfterDates.get(userId));
+            gPlusUsers.add(user);
+        }
+        this.config.setGooglePlusUsers(gPlusUsers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
index 1659ae3..4991e94 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -32,9 +32,9 @@ public class GPlusActivitySerializer implements ActivitySerializer<com.google.ap
 
     private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
 
-    GPlusProvider provider;
+    AbstractGPlusProvider provider;
 
-    public GPlusActivitySerializer(GPlusProvider provider) {
+    public GPlusActivitySerializer(AbstractGPlusProvider provider) {
 
         this.provider = provider;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
deleted file mode 100644
index 46eb301..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.ActivityFeed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class GPlusHistoryProviderTask implements Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusHistoryProviderTask.class);
-
-    private ObjectMapper mapper;
-
-    private GPlusProvider provider;
-    private String userid;
-    private String circle;
-
-    public GPlusHistoryProviderTask(GPlusProvider provider, String userid, String circle) {
-        this.provider = provider;
-        this.userid = userid;
-        this.circle = circle;
-    }
-
-    @Override
-    public void run() {
-
-        Plus.Activities.List listActivities = null;
-        try {
-            listActivities = provider.plus.activities().list(userid, circle);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        listActivities.setMaxResults(100L);
-
-// Execute the request for the first page
-        ActivityFeed activityFeed = null;
-        try {
-            activityFeed = listActivities.execute();
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-
-// Unwrap the request and extract the pieces we want
-        List<com.google.api.services.plus.model.Activity> activities = activityFeed.getItems();
-
-// Loop through until we arrive at an empty page
-        while (activities != null) {
-            for (com.google.api.services.plus.model.Activity gplusActivity : activities) {
-                String json = null;
-                try {
-                    json = mapper.writeValueAsString(gplusActivity);
-                } catch (JsonProcessingException e) {
-                    e.printStackTrace();
-                }
-                provider.inQueue.offer(json);
-            }
-
-            // We will know we are on the last page when the next page token is null.
-            // If this is the case, break.
-            if (activityFeed.getNextPageToken() == null) {
-                break;
-            }
-
-            // Prepare to request the next page of activities
-            listActivities.setPageToken(activityFeed.getNextPageToken());
-
-            // Execute and process the next page request
-            try {
-                activityFeed = listActivities.execute();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            activities = activityFeed.getItems();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
deleted file mode 100644
index 9257783..0000000
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.javanet.NetHttpTransport;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.plus.Plus;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.*;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class GPlusProvider implements StreamsProvider {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusProvider.class);
-
-    private GPlusConfiguration config;
-
-    private Class klass;
-
-    public GPlusConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(GPlusConfiguration config) {
-        this.config = config;
-    }
-
-    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
-
-    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
-    public BlockingQueue<Object> getInQueue() {
-        return inQueue;
-    }
-
-    private static final HttpTransport TRANSPORT = new NetHttpTransport();
-    private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
-    private static final Gson GSON = new Gson();
-
-    protected GoogleClientSecrets clientSecrets;
-    protected GoogleCredential credential;
-    protected Plus plus;
-
-    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-    ListenableFuture providerTaskComplete;
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    public GPlusProvider() {
-        Config config = StreamsConfigurator.config.getConfig("gplus");
-        this.config = GPlusConfigurator.detectConfiguration(config);
-    }
-
-    public GPlusProvider(GPlusConfiguration config) {
-        this.config = config;
-    }
-
-    public GPlusProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("gplus");
-        this.config = GPlusConfigurator.detectConfiguration(config);
-        this.klass = klass;
-    }
-
-    public GPlusProvider(GPlusConfiguration config, Class klass) {
-        this.config = config;
-        this.klass = klass;
-    }
-
-    @Override
-    public void startStream() {
-
-        providerTaskComplete = executor.submit(new GPlusHistoryProviderTask(this, "me", "public"));
-
-        for (int i = 0; i < 1; i++) {
-            new Thread(new GPlusEventProcessor(inQueue, providerQueue, klass));
-        }
-
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        startStream();
-
-        while( !providerTaskComplete.isDone()) {
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) { }
-        }
-
-        return new StreamsResultSet(providerQueue);
-
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !providerTaskComplete.isDone() && !providerTaskComplete.isCancelled();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        Preconditions.checkNotNull(this.klass);
-
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
-        try {
-            credential = new GoogleCredential.Builder()
-                    .setJsonFactory(JSON_FACTORY)
-                    .setTransport(TRANSPORT)
-                    .setClientSecrets(config.getOauth().getConsumerKey(), config.getOauth().getConsumerSecret()).build()
-                    .setFromTokenResponse(JSON_FACTORY.fromString(
-                            config.getOauth().getAccessToken(), GoogleTokenResponse.class));
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-        for (int i = 0; i < 1; i++) {
-            inQueue.add(GPlusEventProcessor.TERMINATE);
-        }
-
-        try {
-            executor.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
new file mode 100644
index 0000000..c6952bf
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -0,0 +1,130 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.ActivityFeed;
+import com.google.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ *
+ */
+public class GPlusUserActivityCollector implements Runnable {
+
+    /**
+     * Key for all public activities
+     * https://developers.google.com/+/api/latest/activities/list
+     */
+    private static final String PUBLIC_COLLECTION = "public";
+    /**
+     * Max results allowed per request
+     * https://developers.google.com/+/api/latest/activities/list
+     */
+    private static final long MAX_RESULTS = 100;
+    private static final int MAX_ATTEMPTS = 5;
+    private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class);
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    static { //set up mapper for Google Activity Object
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
+        MAPPER.registerModule(simpleModule);
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    private BlockingQueue<StreamsDatum> datumQueue;
+    private BackOffStrategy backOff;
+    private Plus gPlus;
+    private UserInfo userInfo;
+
+    public GPlusUserActivityCollector(Plus gPlus, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo) {
+        this.gPlus = gPlus;
+        this.datumQueue = datumQueue;
+        this.backOff = backOff;
+        this.userInfo = userInfo;
+    }
+
+    @Override
+    public void run() {
+        collectActivityData();
+    }
+
+    protected void collectActivityData() {
+        try {
+            ActivityFeed feed = null;
+            boolean tryAgain = false;
+            int attempt = 0;
+            DateTime afterDate = userInfo.getAfterDate();
+            DateTime beforeDate = userInfo.getBeforeDate();
+            do {
+                try {
+                    if(feed == null) {
+                        feed = this.gPlus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).execute();
+                    } else {
+                        feed = this.gPlus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).setPageToken(feed.getNextPageToken()).execute();
+                    }
+                    this.backOff.reset(); //successful pull reset api.
+                    for(com.google.api.services.plus.model.Activity activity : feed.getItems()) {
+                        DateTime published = new DateTime(activity.getPublished().getValue());
+                        if(        (afterDate == null && beforeDate == null)
+                                || (beforeDate == null && afterDate.isBefore(published))
+                                || (afterDate == null && beforeDate.isAfter(published))
+                                || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
+                            this.datumQueue.put(new StreamsDatum(MAPPER.writeValueAsString(activity), activity.getId()));
+                        } else if(afterDate != null && afterDate.isAfter(published)) {
+                            feed.setNextPageToken(null); // do not fetch next page
+                            break;
+                        }
+                    }
+                } catch (GoogleJsonResponseException gjre) {
+                    switch (gjre.getStatusCode()) {
+                        case 400 :
+                            LOGGER.warn("Bad Request for user={} : {}", userInfo.getUserId(), gjre);
+                            tryAgain = false;
+                            break;
+                        case 401 :
+                            LOGGER.warn("Invalid Credentials : {}", gjre);
+                            tryAgain = false;
+                        case 403 :
+                            LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage());
+                            this.backOff.backOff();
+                            tryAgain = true;
+                            break;
+                        case 503 :
+                            LOGGER.warn("Google Backend Service Error : {}", gjre);
+                            tryAgain = false;
+                            break;
+                        default:
+                            LOGGER.warn("Google Service returned error : {}", gjre);
+                            tryAgain = true;
+                            this.backOff.backOff();
+                            break;
+                    }
+                    ++attempt;
+                }
+            } while((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS);
+        } catch (Throwable t) {
+            if(t instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            t.printStackTrace();
+            LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), t);
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
new file mode 100644
index 0000000..e7f1bec
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
@@ -0,0 +1,18 @@
+package com.google.gplus.provider;
+
+import com.google.api.services.plus.Plus;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ *
+ */
+public class GPlusUserActivityProvider extends AbstractGPlusProvider{
+    @Override
+    protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+        return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
new file mode 100644
index 0000000..6a5ce49
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
@@ -0,0 +1,101 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Person;
+import com.google.gplus.serializer.util.GPlusPersonDeserializer;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ *
+ */
+public  class GPlusUserDataCollector implements Runnable{
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class);
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final int MAX_ATTEMPTS = 5;
+
+    static { //set up Mapper for Person objects
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+        MAPPER.registerModule(simpleModule);
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    private BackOffStrategy backOffStrategy;
+    private Plus gPlus;
+    private BlockingQueue<StreamsDatum> datumQueue;
+    private UserInfo userInfo;
+
+
+    public GPlusUserDataCollector(Plus gPlus, BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) {
+        this.gPlus = gPlus;
+        this.backOffStrategy = backOffStrategy;
+        this.datumQueue = datumQueue;
+        this.userInfo = userInfo;
+    }
+
+    protected void queueUserHistory() {
+        try {
+            boolean tryAgain = false;
+            int attempts = 0;
+            com.google.api.services.plus.model.Person person = null;
+            do {
+                try {
+                    person = this.gPlus.people().get(userInfo.getUserId()).execute();
+                    this.backOffStrategy.reset();
+                    tryAgain = person == null;
+                } catch (GoogleJsonResponseException gjre) {
+                    switch (gjre.getStatusCode()) {
+                        case 400 :
+                            LOGGER.warn("Bad Request for user={} : {}", userInfo.getUserId(), gjre);
+                            tryAgain = false;
+                            break;
+                        case 401 :
+                            LOGGER.warn("Invalid Credentials : {}", gjre);
+                            tryAgain = false;
+                        case 403 :
+                            LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage());
+                            this.backOffStrategy.backOff();
+                            tryAgain = true;
+                            break;
+                        case 503 :
+                            LOGGER.warn("Google Backend Service Error : {}", gjre);
+                            tryAgain = false;
+                            break;
+                        default:
+                            LOGGER.warn("Google Service returned error : {}", gjre);
+                            tryAgain = true;
+                            this.backOffStrategy.backOff();
+                            break;
+                    }
+                }
+                ++attempts;
+            } while(tryAgain && attempts < MAX_ATTEMPTS);
+            this.datumQueue.put(new StreamsDatum(MAPPER.writeValueAsString(person), person.getId()));
+        } catch (Throwable t) {
+            LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), t);
+            if(t instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        queueUserHistory();
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
new file mode 100644
index 0000000..0e2782d
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -0,0 +1,18 @@
+package com.google.gplus.provider;
+
+import com.google.api.services.plus.Plus;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ *
+ */
+public class GPlusUserDataProvider extends AbstractGPlusProvider{
+    @Override
+    protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+        return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
index e2d8130..32f6a72 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
@@ -26,11 +26,29 @@
         },
         "follow": {
             "type": "array",
-            "description": "A list of user names, indicating the users whose activities should be delivered on the stream",
+            "description": "DEPRECATED. A list of user names, indicating the users whose activities should be delivered on the stream",
             "items": {
                 "type": "string"
             }
         },
+        "googlePlusUsers": {
+            "type": "array",
+            "description": "A list of user user ids and optional date parameters for the GPlus provider",
+            "items": {
+                "type": "object",
+                "$ref": "#/definitions/userInfo"
+            }
+        },
+        "defaultAfterDate": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Optional parameter for the provider. If this value is not null an the afterDate value in the userInfo is null, this value will be used."
+        },
+        "defaultBeforeDate": {
+            "type": "string",
+            "format": "date-time",
+            "description": "Optional parameter for the provider. If this value is not null and the beforeDate value in the userInfo is null, this value will be used."
+        },
         "oauth": {
             "type": "object",
             "dynamic": "true",
@@ -54,5 +72,29 @@
                 }
             }
         }
+    },
+    "definitions": {
+        "userInfo": {
+            "type": "object",
+            "javaInterfaces" : ["java.io.Serializable"],
+            "dynamic": "true",
+            "javaType": "org.apache.streams.google.gplus.configuration.UserInfo",
+            "properties": {
+                "userId": {
+                    "type": "string",
+                    "description": "instagram user id"
+                },
+                "afterDate": {
+                    "type": "string",
+                    "format": "date-time",
+                    "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
+                },
+                "beforeDate": {
+                    "type": "string",
+                    "format": "date-time",
+                    "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
+                }
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
new file mode 100644
index 0000000..80b1b23
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
@@ -0,0 +1,79 @@
+package com.google.gplus.provider;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.api.services.plus.Plus;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ *
+ */
+public class TestAbstractGPlusProvider extends RandomizedTest{
+
+    @Test
+    @Repeat(iterations = 3)
+    public void testDataCollectorRunsPerUser() {
+        int numUsers = randomIntBetween(1, 1000);
+        List<UserInfo> userList = Lists.newLinkedList();
+        for(int i=0; i < numUsers; ++i) {
+            userList.add(new UserInfo());
+        }
+        GPlusConfiguration config = new GPlusConfiguration();
+        GPlusOAuthConfiguration oauth = new GPlusOAuthConfiguration();
+        oauth.setAccessToken("a");
+        oauth.setConsumerKey("a");
+        oauth.setConsumerSecret("a");
+        oauth.setAccessTokenSecret("a");
+        config.setOauth(oauth);
+        config.setGooglePlusUsers(userList);
+        AbstractGPlusProvider provider = new AbstractGPlusProvider(config) {
+
+            @Override
+            protected Plus createPlusClient() throws IOException {
+                return mock(Plus.class);
+            }
+
+            @Override
+            protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+                final BlockingQueue<StreamsDatum> q = queue;
+                return new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            q.put(new StreamsDatum(null));
+                        } catch (InterruptedException ie) {
+                            fail("Test was interrupted");
+                        }
+                    }
+                };
+            }
+        };
+
+        try {
+            provider.prepare(null);
+            provider.startStream();
+            int datumCount = 0;
+            while(provider.isRunning()) {
+                datumCount += provider.readCurrent().size();
+            }
+            assertEquals(numUsers, datumCount);
+        } finally {
+            provider.cleanUp();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
new file mode 100644
index 0000000..1484af5
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
@@ -0,0 +1,268 @@
+package com.google.gplus.provider;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.ActivityFeed;
+import com.google.common.collect.Lists;
+import com.google.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link com.google.gplus.provider.GPlusUserActivityCollector}
+ */
+public class TestGPlusUserActivityCollector extends RandomizedTest {
+
+
+    private static final String ACTIVITY_TEMPLATE = "{ \"kind\": \"plus#activity\", \"etag\": \"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": \"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": \"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": \"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\", \"actor\": { \"id\": \"116771159471120611293\", \"displayName\": \"Matt Neithercott\", \"url\": \"https://plus.google.com/116771159471120611293\", \"image\": { \"url\": \"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\" } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": \"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": \"104954254300557350002\", \"displayName\": \"Adam Balm\", \"url\": \"https://plus.google.com/104954254300557350002\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-
 s/efA9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": \"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\", \"replies\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\" }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\" }, \"resharers\": { \"totalItems\": 0, \"selfLink\": \"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\" }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": \"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", \"url\": \"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\", \"image\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\", \"type\": 
 \"image/jpeg\" }, \"fullImage\": { \"url\": \"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\", \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, \"annotation\": \"Truth 😜\", \"provider\": { \"title\": \"Reshared Post\" }, \"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ { \"type\": \"public\" } ] } }";
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final String IN_RANGE_IDENTIFIER = "data in range";
+
+
+    static {
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer());
+        MAPPER.registerModule(simpleModule);
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    /**
+     * Creates a randomized activity and randomized date range.
+     * The activity feed is separated into three chunks,
+     * |. . . data too recent to be in date range . . .||. . . data in date range. . .||. . . data too old to be in date range|
+     * [index 0, ............................................................................................., index length-1]
+     * Inside of those chunks data has no order, but the list is ordered by those three chunks.
+     *
+     * The test will check to see if the num of data in the date range make onto the output queue.
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testWithBeforeAndAfterDates() throws InterruptedException {
+        //initialize counts assuming no date ranges will be used
+        int numActivities = randomIntBetween(0, 1000);
+        int numActivitiesInDateRange = numActivities;
+        int numberOutOfRange = 0;
+        int numBerforeRange = 0;
+        int numAfterRange = 0;
+        //determine if date ranges will be used
+        DateTime beforeDate = null;
+        DateTime afterDate = null;
+        if(randomInt() % 2 == 0) {
+            beforeDate = DateTime.now().minusDays(randomIntBetween(1,5));
+        }
+        if(randomInt() % 2 == 0) {
+            if(beforeDate == null) {
+                afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
+            } else {
+                afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
+            }
+        }
+        //update counts if date ranges are going to be used.
+        if(beforeDate != null || afterDate != null ) { //assign amount to be in range
+            numActivitiesInDateRange = randomIntBetween(0, numActivities);
+            numberOutOfRange = numActivities - numActivitiesInDateRange;
+        }
+        if(beforeDate == null && afterDate != null) { //assign all out of range to be before the start of the range
+            numBerforeRange = numberOutOfRange;
+        } else if(beforeDate != null && afterDate == null) { //assign all out of range to be after the start of the range
+            numAfterRange = numberOutOfRange;
+        } else if(beforeDate != null && afterDate != null) { //assign half before range and half after the range
+            numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
+            numBerforeRange = numberOutOfRange / 2;
+        }
+
+        Plus plus = createMockPlus(numBerforeRange, numAfterRange, numActivitiesInDateRange, afterDate, beforeDate);
+        BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
+        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+        UserInfo userInfo = new UserInfo();
+        userInfo.setUserId("A");
+        userInfo.setAfterDate(afterDate);
+        userInfo.setBeforeDate(beforeDate);
+        GPlusUserActivityCollector collector = new GPlusUserActivityCollector(plus, datums, strategy, userInfo);
+        collector.run();
+
+        assertEquals(numActivitiesInDateRange, datums.size());
+        while(!datums.isEmpty()) {
+            StreamsDatum datum = datums.take();
+            assertNotNull(datum);
+            assertNotNull(datum.getDocument());
+            assertTrue(datum.getDocument() instanceof String);
+            assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); //only in range documents are on the out going queue.
+        }
+    }
+
+
+    private Plus createMockPlus(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) {
+        Plus plus = mock(Plus.class);
+        final Plus.Activities activities = createMockPlusActivities(numBefore, numAfter, numInRange, after, before);
+        doAnswer(new Answer() {
+            @Override
+            public Plus.Activities answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return activities;
+            }
+        }).when(plus).activities();
+        return plus;
+    }
+
+    private Plus.Activities createMockPlusActivities(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) {
+        Plus.Activities activities = mock(Plus.Activities.class);
+        try {
+            Plus.Activities.List list = createMockPlusActivitiesList(numBefore, numAfter, numInRange, after, before);
+            when(activities.list(anyString(), anyString())).thenReturn(list);
+        } catch (IOException ioe) {
+            fail("Should not have thrown exception while creating mock. : "+ioe.getMessage());
+        }
+        return activities;
+    }
+
+    private Plus.Activities.List createMockPlusActivitiesList(final int numBefore, final int numAfter, final int numInRange, final DateTime after, final DateTime before) {
+        Plus.Activities.List list = mock(Plus.Activities.List.class);
+        when(list.setMaxResults(anyLong())).thenReturn(list);
+        when(list.setPageToken(anyString())).thenReturn(list);
+        ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, numInRange, after, before);
+        try {
+            doAnswer(answer).when(list).execute();
+        } catch (IOException ioe) {
+            fail("Should not have thrown exception while creating mock. : "+ioe.getMessage());
+        }
+        return list;
+    }
+
+
+    private static ActivityFeed createMockActivityFeed(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before, boolean page) {
+        ActivityFeed feed = new ActivityFeed();
+        List<Activity> list = Lists.newLinkedList();
+        for(int i=0; i < numAfter; ++i) {
+            DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE));
+            Activity activity = createActivityWithPublishedDate(published);
+            list.add(activity);
+        }
+        for(int i=0; i < numInRange; ++i) {
+            DateTime published = null;
+            if((before == null && after == null) || before == null) {
+                published = DateTime.now(); // no date range or end time date range so just make the time now.
+            } else if(after == null) {
+                published = before.minusMillis(randomIntBetween(1, Integer.MAX_VALUE)); //no beginning to range
+            } else { // has to be in range
+                long range = before.getMillis() - after.getMillis();
+                published = after.plus(range / 2); //in the middle
+            }
+            Activity activity = createActivityWithPublishedDate(published);
+            activity.setTitle(IN_RANGE_IDENTIFIER);
+            list.add(activity);
+        }
+        for(int i=0; i < numBefore; ++i) {
+            DateTime published = after.minusMillis(randomIntBetween(1, Integer.MAX_VALUE));
+            Activity activity = createActivityWithPublishedDate(published);
+            list.add(activity);
+        }
+        if(page) {
+            feed.setNextPageToken("A");
+        } else {
+            feed.setNextPageToken(null);
+        }
+        feed.setItems(list);
+        return feed;
+    }
+
+    private static Activity createActivityWithPublishedDate(DateTime dateTime) {
+        Activity activity = new Activity();
+        activity.setPublished(new com.google.api.client.util.DateTime(dateTime.getMillis()));
+        activity.setId("a");
+        return activity;
+    }
+
+    private static class ActivityFeedAnswer implements Answer<ActivityFeed> {
+        private int afterCount = 0;
+        private int beforeCount = 0;
+        private int inCount = 0;
+        private int maxBatch = 100;
+
+        private int numAfter;
+        private int numInRange;
+        private int numBefore;
+        private DateTime after;
+        private DateTime before;
+
+        private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) {
+            this.numBefore = numBefore;
+            this.numAfter = numAfter;
+            this.numInRange = numInRange;
+            this.after = after;
+            this.before = before;
+        }
+
+
+
+
+        @Override
+        public ActivityFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+            int totalCount = 0;
+            int batchAfter = 0;
+            int batchBefore = 0;
+            int batchIn = 0;
+            if(afterCount != numAfter) {
+                if(numAfter - afterCount >= maxBatch) {
+                    afterCount += maxBatch;
+                    batchAfter += maxBatch;
+                    totalCount += batchAfter;
+                } else {
+                    batchAfter += numAfter - afterCount;
+                    totalCount += numAfter - afterCount;
+                    afterCount = numAfter;
+                }
+            }
+            if(totalCount < maxBatch && inCount != numInRange) {
+                if(numInRange - inCount >= maxBatch - totalCount) {
+                    inCount += maxBatch - totalCount;
+                    batchIn += maxBatch - totalCount;
+                    totalCount += batchIn;
+                } else {
+                    batchIn += numInRange - inCount;
+                    totalCount += numInRange - inCount;
+                    inCount = numInRange;
+                }
+            }
+            if(totalCount < maxBatch && beforeCount != numBefore) {
+                if(numBefore - batchBefore >= maxBatch - totalCount) {
+                    batchBefore += maxBatch - totalCount;
+                    totalCount = maxBatch;
+                    beforeCount +=batchBefore;
+                } else {
+                    batchBefore += numBefore - beforeCount;
+                    totalCount += numBefore - beforeCount;
+                    beforeCount = numBefore;
+                }
+            }
+
+            return createMockActivityFeed(batchBefore, batchAfter, batchIn, after, before, numAfter != afterCount || inCount != numInRange || beforeCount != numBefore);
+        }
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
new file mode 100644
index 0000000..784e1b5
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
@@ -0,0 +1,131 @@
+package com.google.gplus.provider;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Person;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector}
+ */
+public class TestGPlusUserDataCollector {
+
+    private static final String NO_ERROR = "no error";
+
+
+    /**
+     * Test that on success a datum will be added to the queue.
+     * @throws Exception
+     */
+    @Test
+    public void testSucessfullPull() throws Exception {
+        Plus plus = createMockPlus(0, null);
+        BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+        UserInfo user = new UserInfo();
+        user.setUserId("A");
+
+        GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOff, datums, user);
+        collector.run();
+
+        assertEquals(1, datums.size());
+        StreamsDatum datum = datums.take();
+        assertNotNull(datum);
+        assertEquals(NO_ERROR, datum.getId());
+        assertNotNull(datum.getDocument());
+        assertTrue(datum.getDocument() instanceof String);
+    }
+
+    /**
+     * Test that on failure, no datums are output
+     * @throws Exception
+     */
+    @Test
+    public void testFail() throws Exception {
+        Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class));
+        UserInfo user = new UserInfo();
+        user.setUserId("A");
+        BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+        BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1);
+
+        GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, backOffStrategy, datums, user);
+        collector.run();
+
+        assertEquals(0, datums.size());
+    }
+
+
+
+    private Plus createMockPlus(final int succedOnTry, final Throwable throwable) {
+        Plus plus = mock(Plus.class);
+        doAnswer(new Answer() {
+            @Override
+            public Plus.People answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return createMockPeople(succedOnTry, throwable);
+            }
+        }).when(plus).people();
+        return plus;
+    }
+
+    private Plus.People createMockPeople(final int succedOnTry, final Throwable throwable) {
+        Plus.People people = mock(Plus.People.class);
+        try {
+            when(people.get(anyString())).thenAnswer(new Answer<Plus.People.Get>() {
+                @Override
+                public Plus.People.Get answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    return createMockGetNoError(succedOnTry, throwable);
+                }
+            });
+        } catch (IOException ioe) {
+            fail("No Excpetion should have been thrown while creating mocks");
+        }
+        return people;
+    }
+
+    private Plus.People.Get createMockGetNoError(final int succedOnTry, final Throwable throwable) {
+        Plus.People.Get get = mock(Plus.People.Get.class);
+        try {
+            doAnswer(new Answer() {
+                private int counter =0;
+
+                @Override
+                public Person answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    if(counter == succedOnTry) {
+                        Person p = new Person();
+                        p.setId(NO_ERROR);
+                        return p;
+                    } else {
+                        ++counter;
+                        throw throwable;
+                    }
+                }
+            }).when(get).execute();
+        } catch (IOException ioe) {
+            fail("No Excpetion should have been thrown while creating mocks");
+        }
+        return get;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-contrib/streams-provider-google/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/pom.xml b/streams-contrib/streams-provider-google/pom.xml
index b720b76..99cd158 100644
--- a/streams-contrib/streams-provider-google/pom.xml
+++ b/streams-contrib/streams-provider-google/pom.xml
@@ -39,6 +39,12 @@
                 <artifactId>streams-pojo</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-all</artifactId>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e0e9dad/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
index 45e4239..a68a94a 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
@@ -14,6 +14,8 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.util.api.requests.backoff;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * @see org.apache.streams.util.api.requests.backoff.BackOffStrategy
  */
@@ -22,7 +24,7 @@ public abstract class AbstractBackOffStrategy implements BackOffStrategy {
     private long baseSleepTime;
     private long lastSleepTime;
     private int maxAttempts;
-    private int attemptsCount;
+    private AtomicInteger attemptsCount;
 
     /**
      * A BackOffStrategy that can effectively be used endlessly.
@@ -46,16 +48,17 @@ public abstract class AbstractBackOffStrategy implements BackOffStrategy {
         }
         this.baseSleepTime = baseBackOffTime;
         this.maxAttempts = maximumNumberOfBackOffAttempts;
-        this.attemptsCount = 0;
+        this.attemptsCount = new AtomicInteger(0);
     }
 
     @Override
     public void backOff() throws BackOffException {
-        if(this.attemptsCount++ >= this.maxAttempts && this.maxAttempts != -1) {
-            throw new BackOffException(this.attemptsCount-1, this.lastSleepTime);
+        int attempt = this.attemptsCount.getAndIncrement();
+        if(attempt >= this.maxAttempts && this.maxAttempts != -1) {
+            throw new BackOffException(attempt, this.lastSleepTime);
         } else {
             try {
-                Thread.sleep(this.lastSleepTime = calculateBackOffTime(this.attemptsCount, this.baseSleepTime));
+                Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime));
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
             }
@@ -64,7 +67,7 @@ public abstract class AbstractBackOffStrategy implements BackOffStrategy {
 
     @Override
     public void reset() {
-        this.attemptsCount = 0;
+        this.attemptsCount.set(0);
     }
 
     /**