You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:40 UTC
[62/71] [abbrv] adding google provider,
tweaks to localbuilder and hdfs reader
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml
new file mode 100644
index 0000000..99f9321
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-google</artifactId>
+ <version>0.1.STREAMS26-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>google-gplus</artifactId>
+
+ <repositories>
+ <repository>
+ <id>typesafe</id>
+ <name>typesafe</name>
+ <url>http://repo.typesafe.com/typesafe/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path-assert</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-plus</artifactId>
+ <version>v1-rev118-1.17.0-rc</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ <version>1.17.0-rc</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client-jackson2</artifactId>
+ <version>1.17.0-rc</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/com/google/gplus/GPlusConfiguration.json</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.google.gplus.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
new file mode 100644
index 0000000..9bb6350
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -0,0 +1,117 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class);
+
+ GPlusProvider provider;
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ public GPlusActivitySerializer(GPlusProvider provider) {
+
+ this.provider = provider;
+
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+
+ }
+
+ public GPlusActivitySerializer() {
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "gplus.v1";
+ }
+
+ @Override
+ public com.google.api.services.plus.model.Activity serialize(Activity deserialized) {
+ throw new NotImplementedException("Not currently implemented");
+ }
+
+ @Override
+ public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) {
+
+ // There is totally a better way to do this
+ // 1) Deep copy all jackson fields that overlap
+ // 2) Check all objects are present
+ // 3) Check essential fields have values
+ // 4) Any that don't, set them based on other fields that are present
+
+ Activity activity = new Activity();
+ activity.setId(formatId(gplusActivity.getId()));
+ activity.setPublished(new Date(gplusActivity.getPublished().getValue()));
+ Provider provider = new Provider();
+ provider.setId("http://plus.google.com");
+ provider.setDisplayName("GPlus");
+ activity.setProvider(provider);
+ Actor actor = new Actor();
+ actor.setId(gplusActivity.getActor().getId());
+ actor.setDisplayName(gplusActivity.getActor().getDisplayName());
+ actor.setUrl(gplusActivity.getActor().getUrl());
+ activity.setActor(actor);
+ activity.setVerb(gplusActivity.getVerb());
+ ActivityObject object = new ActivityObject();
+ object.setId(gplusActivity.getObject().getId());
+ object.setUrl(gplusActivity.getObject().getUrl());
+ object.setContent(gplusActivity.getObject().getContent());
+ activity.setTitle(gplusActivity.getTitle());
+ activity.setContent(gplusActivity.getObject().getContent());
+ activity.setObject(object);
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) {
+ throw new NotImplementedException("Not currently implemented");
+ }
+
+ public static Generator buildGenerator(ObjectNode event) {
+ return null;
+ }
+
+ public static Icon getIcon(ObjectNode event) {
+ return null;
+ }
+
+ public static Provider buildProvider(ObjectNode event) {
+ Provider provider = new Provider();
+ provider.setId("id:providers:gmail");
+ return provider;
+ }
+
+ public static List<Object> getLinks(ObjectNode event) {
+ return null;
+ }
+
+ public static String getUrls(ObjectNode event) {
+ return null;
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:gmail", idparts));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
new file mode 100644
index 0000000..9cd435d
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
@@ -0,0 +1,36 @@
+package com.google.gplus.provider;
+
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GPlusConfigurator.class);
+
+ public static GPlusConfiguration detectConfiguration(Config config) {
+ Config oauth = StreamsConfigurator.config.getConfig("gplus.oauth");
+
+ GPlusConfiguration gplusConfiguration = new GPlusConfiguration();
+
+ gplusConfiguration.setProtocol(config.getString("protocol"));
+ gplusConfiguration.setHost(config.getString("host"));
+ gplusConfiguration.setPort(config.getLong("port"));
+ gplusConfiguration.setVersion(config.getString("version"));
+ GPlusOAuthConfiguration gPlusOAuthConfiguration = new GPlusOAuthConfiguration();
+ gPlusOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
+ gPlusOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
+ gPlusOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
+ gPlusOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
+ gplusConfiguration.setOauth(gPlusOAuthConfiguration);
+
+ return gplusConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
new file mode 100644
index 0000000..579cdbf
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
@@ -0,0 +1,79 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusEventProcessor implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GPlusEventProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private BlockingQueue<String> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private GPlusActivitySerializer gPlusActivitySerializer = new GPlusActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.outClass = outClass;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ try {
+ String item = inQueue.take();
+ Thread.sleep(new Random().nextInt(100));
+ if(item==TERMINATE) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ // first check for valid json
+ ObjectNode node = (ObjectNode)mapper.readTree(item);
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass))
+ outQueue.offer(new StreamsDatum(item));
+ else {
+ // convert to desired format
+ com.google.api.services.plus.model.Activity gplusActivity = (com.google.api.services.plus.model.Activity)mapper.readValue(item, com.google.api.services.plus.model.Activity.class);
+
+ Activity streamsActivity = gPlusActivitySerializer.deserialize(gplusActivity);
+
+ outQueue.offer(new StreamsDatum(streamsActivity));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
new file mode 100644
index 0000000..1176843
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusHistoryProviderTask.java
@@ -0,0 +1,88 @@
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.ActivityFeed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusHistoryProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GPlusHistoryProviderTask.class);
+
+ private ObjectMapper mapper;
+
+ private GPlusProvider provider;
+ private String userid;
+ private String circle;
+
+ public GPlusHistoryProviderTask(GPlusProvider provider, String userid, String circle) {
+ this.provider = provider;
+ this.userid = userid;
+ this.circle = circle;
+ }
+
+ @Override
+ public void run() {
+
+ Plus.Activities.List listActivities = null;
+ try {
+ listActivities = provider.plus.activities().list(userid, circle);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ listActivities.setMaxResults(100L);
+
+// Execute the request for the first page
+ ActivityFeed activityFeed = null;
+ try {
+ activityFeed = listActivities.execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+
+// Unwrap the request and extract the pieces we want
+ List<com.google.api.services.plus.model.Activity> activities = activityFeed.getItems();
+
+// Loop through until we arrive at an empty page
+ while (activities != null) {
+ for (com.google.api.services.plus.model.Activity gplusActivity : activities) {
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(gplusActivity);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ provider.inQueue.offer(json);
+ }
+
+ // We will know we are on the last page when the next page token is null.
+ // If this is the case, break.
+ if (activityFeed.getNextPageToken() == null) {
+ break;
+ }
+
+ // Prepare to request the next page of activities
+ listActivities.setPageToken(activityFeed.getNextPageToken());
+
+ // Execute and process the next page request
+ try {
+ activityFeed = listActivities.execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ activities = activityFeed.getItems();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
new file mode 100644
index 0000000..523bd46
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
@@ -0,0 +1,166 @@
+package com.google.gplus.provider;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.*;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GPlusProvider implements StreamsProvider {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GPlusProvider.class);
+
+ private GPlusConfiguration config;
+
+ private Class klass;
+
+ public GPlusConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(GPlusConfiguration config) {
+ this.config = config;
+ }
+
+ protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+
+ protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ public BlockingQueue<Object> getInQueue() {
+ return inQueue;
+ }
+
+ private static final HttpTransport TRANSPORT = new NetHttpTransport();
+ private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
+ private static final Gson GSON = new Gson();
+
+ protected GoogleClientSecrets clientSecrets;
+ protected GoogleCredential credential;
+ protected Plus plus;
+
+ protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+ ListenableFuture providerTaskComplete;
+
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ public GPlusProvider() {
+ Config config = StreamsConfigurator.config.getConfig("gplus");
+ this.config = GPlusConfigurator.detectConfiguration(config);
+ }
+
+ public GPlusProvider(GPlusConfiguration config) {
+ this.config = config;
+ }
+
+ public GPlusProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("gplus");
+ this.config = GPlusConfigurator.detectConfiguration(config);
+ this.klass = klass;
+ }
+
+ public GPlusProvider(GPlusConfiguration config, Class klass) {
+ this.config = config;
+ this.klass = klass;
+ }
+
+ @Override
+ public void startStream() {
+
+ providerTaskComplete = executor.submit(new GPlusHistoryProviderTask(this, "me", "public"));
+
+ for (int i = 0; i < 1; i++) {
+ new Thread(new GPlusEventProcessor(inQueue, providerQueue, klass));
+ }
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ startStream();
+
+ while( !providerTaskComplete.isDone()) {
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) { }
+ }
+
+ return new StreamsResultSet(providerQueue);
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ Preconditions.checkNotNull(this.klass);
+
+ Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+ Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+ Preconditions.checkNotNull(config.getOauth().getAccessToken());
+ Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+
+ try {
+ credential = new GoogleCredential.Builder()
+ .setJsonFactory(JSON_FACTORY)
+ .setTransport(TRANSPORT)
+ .setClientSecrets(config.getOauth().getConsumerKey(), config.getOauth().getConsumerSecret()).build()
+ .setFromTokenResponse(JSON_FACTORY.fromString(
+ config.getOauth().getAccessToken(), GoogleTokenResponse.class));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ for (int i = 0; i < 1; i++) {
+ inQueue.add(GPlusEventProcessor.TERMINATE);
+ }
+
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
new file mode 100644
index 0000000..e2d8130
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/jsonschema/com/google/gplus/GPlusConfiguration.json
@@ -0,0 +1,58 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.google.gplus.GPlusConfiguration",
+ "properties": {
+ "protocol": {
+ "type": "string",
+ "description": "The protocol"
+ },
+ "host": {
+ "type": "string",
+ "description": "The host"
+ },
+ "port": {
+ "type": "integer",
+ "description": "The port"
+ },
+ "version": {
+ "type": "string",
+ "description": "The version"
+ },
+ "endpoint": {
+ "type": "string",
+ "description": "The endpoint"
+ },
+ "follow": {
+ "type": "array",
+ "description": "A list of user names, indicating the users whose activities should be delivered on the stream",
+ "items": {
+ "type": "string"
+ }
+ },
+ "oauth": {
+ "type": "object",
+ "dynamic": "true",
+ "javaType" : "org.apache.streams.google.gplus.GPlusOAuthConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "appName": {
+ "type": "string"
+ },
+ "consumerKey": {
+ "type": "string"
+ },
+ "consumerSecret": {
+ "type": "string"
+ },
+ "accessToken": {
+ "type": "string"
+ },
+ "accessTokenSecret": {
+ "type": "string"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf b/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
new file mode 100644
index 0000000..c6c1e01
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/resources/reference.conf
@@ -0,0 +1,11 @@
+gplus {
+ protocol = "https"
+ host = "www.googleapis.com/plus"
+ port = 443
+ version = "v1"
+ endpoint = "people/me/activities/public?maxResults=100"
+ filter-level = "none"
+ oauth {
+ appName = "Apache Streams"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
new file mode 100644
index 0000000..e9641fc
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -0,0 +1,53 @@
+package com.google.gmail.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 8/20/13
+ * Time: 5:57 PM
+ * To change this template use File | Settings | File Templates.
+ */
+@Ignore
+public class GMailMessageSerDeTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @Ignore
+ @Test
+ public void Tests()
+ {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+ InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ LOGGER.debug(line);
+
+ // implement
+ }
+ } catch( Exception e ) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+}