You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:40 UTC

[62/71] [abbrv] adding google provider, tweaks to localbuilder and hdfs reader

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml
new file mode 100644
index 0000000..99f9321
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-provider-google</artifactId>
+        <version>0.1.STREAMS26-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>google-gplus</artifactId>
+
+    <repositories>
+        <repository>
+            <id>typesafe</id>
+            <name>typesafe</name>
+            <url>http://repo.typesafe.com/typesafe/repo</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.apis</groupId>
+            <artifactId>google-api-services-plus</artifactId>
+            <version>v1-rev118-1.17.0-rc</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.api-client</groupId>
+            <artifactId>google-api-client</artifactId>
+            <version>1.17.0-rc</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.http-client</groupId>
+            <artifactId>google-http-client-jackson2</artifactId>
+            <version>1.17.0-rc</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/com/google/gplus/GPlusConfiguration.json</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.google.gplus.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
new file mode 100644
index 0000000..9bb6350
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -0,0 +1,117 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
+
+    GPlusProvider provider;
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    public GPlusActivitySerializer(GPlusProvider provider) {
+
+        this.provider = provider;
+
+        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+
+    }
+
+    public GPlusActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "gplus.v1";
+    }
+
+    @Override
+    public com.google.api.services.plus.model.Activity serialize(Activity deserialized) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+    @Override
+    public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) {
+
+        // There is totally a better way to do this
+        //   1) Deep copy all jackson fields that overlap
+        //   2) Check all objects are present
+        //   3) Check essential fields have values
+        //   4) Any that don't, set them based on other fields that are present
+
+        Activity activity = new Activity();
+        activity.setId(formatId(gplusActivity.getId()));
+        activity.setPublished(new Date(gplusActivity.getPublished().getValue()));
+        Provider provider = new Provider();
+        provider.setId("http://plus.google.com");
+        provider.setDisplayName("GPlus");
+        activity.setProvider(provider);
+        Actor actor = new Actor();
+        actor.setId(gplusActivity.getActor().getId());
+        actor.setDisplayName(gplusActivity.getActor().getDisplayName());
+        actor.setUrl(gplusActivity.getActor().getUrl());
+        activity.setActor(actor);
+        activity.setVerb(gplusActivity.getVerb());
+        ActivityObject object = new ActivityObject();
+        object.setId(gplusActivity.getObject().getId());
+        object.setUrl(gplusActivity.getObject().getUrl());
+        object.setContent(gplusActivity.getObject().getContent());
+        activity.setTitle(gplusActivity.getTitle());
+        activity.setContent(gplusActivity.getObject().getContent());
+        activity.setObject(object);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+    public static Generator buildGenerator(ObjectNode event) {
+        return null;
+    }
+
+    public static Icon getIcon(ObjectNode event) {
+        return null;
+    }
+
+    public static Provider buildProvider(ObjectNode event) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:gmail");
+        return provider;
+    }
+
+    public static List<Object> getLinks(ObjectNode event) {
+        return null;
+    }
+
+    public static String getUrls(ObjectNode event) {
+        return null;
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:gmail", idparts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
new file mode 100644
index 0000000..9cd435d
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
@@ -0,0 +1,36 @@
+package com.google.gplus.provider;
+
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusConfigurator.class);
+
+    public static GPlusConfiguration detectConfiguration(Config config) {
+        Config oauth = StreamsConfigurator.config.getConfig("gplus.oauth");
+
+        GPlusConfiguration gplusConfiguration = new GPlusConfiguration();
+
+        gplusConfiguration.setProtocol(config.getString("protocol"));
+        gplusConfiguration.setHost(config.getString("host"));
+        gplusConfiguration.setPort(config.getLong("port"));
+        gplusConfiguration.setVersion(config.getString("version"));
+        GPlusOAuthConfiguration gPlusOAuthConfiguration = new GPlusOAuthConfiguration();
+        gPlusOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
+        gPlusOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
+        gPlusOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
+        gPlusOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
+        gplusConfiguration.setOauth(gPlusOAuthConfiguration);
+
+        return gplusConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
new file mode 100644
index 0000000..579cdbf
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
@@ -0,0 +1,79 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusEventProcessor implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusEventProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private BlockingQueue<String> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private GPlusActivitySerializer gPlusActivitySerializer = new GPlusActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.outClass = outClass;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            try {
+                String item = inQueue.take();
+                Thread.sleep(new Random().nextInt(100));
+                if(item==TERMINATE) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                // first check for valid json
+                ObjectNode node = (ObjectNode)mapper.readTree(item);
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass))
+                    outQueue.offer(new StreamsDatum(item));
+                else {
+                    // convert to desired format
+                    com.google.api.services.plus.model.Activity gplusActivity = (com.google.api.services.plus.model.Activity)mapper.readValue(item, com.google.api.services.plus.model.Activity.class);
+
+                    Activity streamsActivity = gPlusActivitySerializer.deserialize(gplusActivity);
+
+                    outQueue.offer(new StreamsDatum(streamsActivity));
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
new file mode 100644
index 0000000..1176843
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
@@ -0,0 +1,88 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.ActivityFeed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusHistoryProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusHistoryProviderTask.class);
+
+    private ObjectMapper mapper;
+
+    private GPlusProvider provider;
+    private String userid;
+    private String circle;
+
+    public GPlusHistoryProviderTask(GPlusProvider provider, String userid, String circle) {
+        this.provider = provider;
+        this.userid = userid;
+        this.circle = circle;
+    }
+
+    @Override
+    public void run() {
+
+        Plus.Activities.List listActivities = null;
+        try {
+            listActivities = provider.plus.activities().list(userid, circle);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+        listActivities.setMaxResults(100L);
+
+// Execute the request for the first page
+        ActivityFeed activityFeed = null;
+        try {
+            activityFeed = listActivities.execute();
+        } catch (IOException e) {
+            e.printStackTrace();
+            return;
+        }
+
+// Unwrap the request and extract the pieces we want
+        List<com.google.api.services.plus.model.Activity> activities = activityFeed.getItems();
+
+// Loop through until we arrive at an empty page
+        while (activities != null) {
+            for (com.google.api.services.plus.model.Activity gplusActivity : activities) {
+                String json = null;
+                try {
+                    json = mapper.writeValueAsString(gplusActivity);
+                } catch (JsonProcessingException e) {
+                    e.printStackTrace();
+                }
+                provider.inQueue.offer(json);
+            }
+
+            // We will know we are on the last page when the next page token is null.
+            // If this is the case, break.
+            if (activityFeed.getNextPageToken() == null) {
+                break;
+            }
+
+            // Prepare to request the next page of activities
+            listActivities.setPageToken(activityFeed.getNextPageToken());
+
+            // Execute and process the next page request
+            try {
+                activityFeed = listActivities.execute();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            activities = activityFeed.getItems();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
new file mode 100644
index 0000000..523bd46
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
@@ -0,0 +1,166 @@
+package com.google.gplus.provider;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.*;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GPlusProvider.class);
+
+    private GPlusConfiguration config;
+
+    private Class klass;
+
+    public GPlusConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(GPlusConfiguration config) {
+        this.config = config;
+    }
+
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    public BlockingQueue<Object> getInQueue() {
+        return inQueue;
+    }
+
+    private static final HttpTransport TRANSPORT = new NetHttpTransport();
+    private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
+    private static final Gson GSON = new Gson();
+
+    protected GoogleClientSecrets clientSecrets;
+    protected GoogleCredential credential;
+    protected Plus plus;
+
+    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+    ListenableFuture providerTaskComplete;
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public GPlusProvider() {
+        Config config = StreamsConfigurator.config.getConfig("gplus");
+        this.config = GPlusConfigurator.detectConfiguration(config);
+    }
+
+    public GPlusProvider(GPlusConfiguration config) {
+        this.config = config;
+    }
+
+    public GPlusProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("gplus");
+        this.config = GPlusConfigurator.detectConfiguration(config);
+        this.klass = klass;
+    }
+
+    public GPlusProvider(GPlusConfiguration config, Class klass) {
+        this.config = config;
+        this.klass = klass;
+    }
+
+    @Override
+    public void startStream() {
+
+        providerTaskComplete = executor.submit(new GPlusHistoryProviderTask(this, "me", "public"));
+
+        for (int i = 0; i < 1; i++) {
+            new Thread(new GPlusEventProcessor(inQueue, providerQueue, klass));
+        }
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        startStream();
+
+        while( !providerTaskComplete.isDone()) {
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) { }
+        }
+
+        return new StreamsResultSet(providerQueue);
+
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(this.klass);
+
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+
+        try {
+            credential = new GoogleCredential.Builder()
+                    .setJsonFactory(JSON_FACTORY)
+                    .setTransport(TRANSPORT)
+                    .setClientSecrets(config.getOauth().getConsumerKey(), config.getOauth().getConsumerSecret()).build()
+                    .setFromTokenResponse(JSON_FACTORY.fromString(
+                            config.getOauth().getAccessToken(), GoogleTokenResponse.class));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+        for (int i = 0; i < 1; i++) {
+            inQueue.add(GPlusEventProcessor.TERMINATE);
+        }
+
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
new file mode 100644
index 0000000..e2d8130
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
@@ -0,0 +1,58 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.google.gplus.GPlusConfiguration",
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "follow": {
+            "type": "array",
+            "description": "A list of user names, indicating the users whose activities should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appName": {
+                    "type": "string"
+                },
+                "consumerKey": {
+                    "type": "string"
+                },
+                "consumerSecret": {
+                    "type": "string"
+                },
+                "accessToken": {
+                    "type": "string"
+                },
+                "accessTokenSecret": {
+                    "type": "string"
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf b/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
new file mode 100644
index 0000000..c6c1e01
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
@@ -0,0 +1,11 @@
+gplus {
+    protocol = "https"
+    host = "www.googleapis.com/plus"
+    port = 443
+    version = "v1"
+    endpoint = "people/me/activities/public?maxResults=100"
+    filter-level = "none"
+    oauth {
+        appName = "Apache Streams"
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
new file mode 100644
index 0000000..e9641fc
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -0,0 +1,53 @@
+package com.google.gmail.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 8/20/13
+ * Time: 5:57 PM
+ * To change this template use File | Settings | File Templates.
+ */
+@Ignore
+public class GMailMessageSerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Ignore
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                LOGGER.debug(line);
+
+                // implement
+            }
+        } catch( Exception e ) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}