You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/21 20:44:51 UTC

[5/5] git commit: adding google provider, tweaks to localbuilder and hdfs reader

adding google provider, tweaks to localbuilder and hdfs reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/61592dc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/61592dc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/61592dc0

Branch: refs/heads/STREAMS-26
Commit: 61592dc0742bcae969c0c34c9d4158d5d164eddc
Parents: 2ec7fe8
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Mar 21 14:44:13 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Mar 21 14:44:13 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 streams-contrib/pom.xml                         |   1 +
 .../streams-components-test.iml                 |  39 +++-
 .../apache/streams/hdfs/HdfsConfigurator.java   |  15 ++
 .../streams/hdfs/WebHdfsPersistReader.java      |  63 ++----
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |  49 ++---
 .../google-gmail/pom.xml                        | 135 +++++++++++++
 .../com/google/gmail/GMailConfigurator.java     |  27 +++
 .../gmail/provider/GMailImapProviderTask.java   |  58 ++++++
 .../GMailMessageActivitySerializer.java         | 194 +++++++++++++++++++
 .../google/gmail/provider/GMailProvider.java    | 131 +++++++++++++
 .../gmail/provider/GMailRssProviderTask.java    |  36 ++++
 .../com/google/gmail/GMailConfiguration.json    |  14 ++
 .../gmail/test/GMailMessageSerDeTest.java       |  53 +++++
 .../src/test/resources/datasift_jsons.txt       | 101 ++++++++++
 .../google-gplus/pom.xml                        | 145 ++++++++++++++
 .../gplus/provider/GPlusActivitySerializer.java | 117 +++++++++++
 .../gplus/provider/GPlusConfigurator.java       |  36 ++++
 .../gplus/provider/GPlusEventProcessor.java     |  79 ++++++++
 .../provider/GPlusHistoryProviderTask.java      |  88 +++++++++
 .../google/gplus/provider/GPlusProvider.java    | 166 ++++++++++++++++
 .../com/google/gplus/GPlusConfiguration.json    |  58 ++++++
 .../src/main/resources/reference.conf           |  11 ++
 .../gmail/test/GMailMessageSerDeTest.java       |  53 +++++
 .../src/test/resources/datasift_jsons.txt       | 101 ++++++++++
 streams-contrib/streams-provider-google/pom.xml |  44 +++++
 .../core/builders/LocalStreamBuilder.java       |  11 +-
 .../streams/core/tasks/BaseStreamsTask.java     |   2 +-
 .../core/tasks/LocalStreamMonitorThread.java    |  69 +++++++
 29 files changed, 1821 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1fec2b7..b358dc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <jaxbutil.version>1.2.6</jaxbutil.version>
         <junit.version>4.11</junit.version>
         <slf4j.version>1.7.6</slf4j.version>
-        <logback.version>1.0.9</logback.version>
+        <logback.version>1.1.1</logback.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
         <typesafe.config.version>1.2.0</typesafe.config.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 896bb9b..239644b 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,6 +43,7 @@
         <module>streams-persist-mongo</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>
+        <module>streams-provider-google</module>
         <module>streams-provider-gnip</module>
         <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-components-test/streams-components-test.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-components-test/streams-components-test.iml b/streams-contrib/streams-components-test/streams-components-test.iml
index fbec879..60ae74a 100644
--- a/streams-contrib/streams-components-test/streams-components-test.iml
+++ b/streams-contrib/streams-components-test/streams-components-test.iml
@@ -11,11 +11,44 @@
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="module" module-name="streams-core (1)" />
+    <orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" />
+    <orderEntry type="module" module-name="streams-util (1)" />
+    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.1.1" level="project" />
+    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.1" level="project" />
+    <orderEntry type="module" module-name="streams-pojo (1)" />
+    <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.codemodel:codemodel:2.6" level="project" />
+    <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
+    <orderEntry type="library" name="Maven: javax.validation:validation-api:1.0.0.GA" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.11" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
+    <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
+    <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml:jackson-xml-databind:0.5.0" level="project" />
+    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.0" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml:aalto-xml:0.9.9" level="project" />
+    <orderEntry type="library" name="Maven: nz.net.ultraq.jaxb:jaxb-utilities:1.2.6" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" />
+    <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
+    <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
+    <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
+    <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.2.1" level="project" />
+    <orderEntry type="library" name="Maven: org.json:json:20090211" level="project" />
     <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
     <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
-    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
-    <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
-    <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
   </component>
   <component name="POM File Configuration" pomFile="" />
 </module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index dc2b338..dfbc273 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -1,5 +1,6 @@
 package org.apache.streams.hdfs;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -11,6 +12,8 @@ public class HdfsConfigurator {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(HdfsConfigurator.class);
 
+    private final static ObjectMapper mapper = new ObjectMapper();
+
     public static HdfsConfiguration detectConfiguration(Config hdfs) {
         String host = hdfs.getString("host");
         Long port = hdfs.getLong("port");
@@ -29,4 +32,16 @@ public class HdfsConfigurator {
         return hdfsConfiguration;
     }
 
+    public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) {
+
+        HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
+        HdfsReaderConfiguration hdfsReaderConfiguration  = mapper.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class);
+
+        String readerPath = hdfs.getString("readerPath");
+
+        hdfsReaderConfiguration.setReaderPath(readerPath);
+
+        return hdfsReaderConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 511f684..e01d9d4 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,13 +1,14 @@
 package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.core.StreamsDatum;
@@ -17,16 +18,16 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Created by sblackmon on 2/28/14.
@@ -117,11 +118,19 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
         connectToWebHDFS();
         path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath());
         try {
-            status = client.listStatus(path);
+            if( client.isFile(path)) {
+                FileStatus fileStatus = client.getFileStatus(path);
+                status = new FileStatus[1];
+                status[0] = fileStatus;
+            } else if( client.isDirectory(path)){
+                status = client.listStatus(path);
+            } else {
+                LOGGER.error("Neither file nor directory, wtf");
+            }
         } catch (IOException e) {
             e.printStackTrace();
         }
-        persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000);
         executor = Executors.newSingleThreadExecutor();
     }
 
@@ -132,7 +141,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
 
     @Override
     public StreamsResultSet readAll() {
-        readSourceWritePersistQueue();
+        startStream();
         return new StreamsResultSet(persistQueue);
     }
 
@@ -167,40 +176,4 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
         return null;
     }
 
-    private void readSourceWritePersistQueue() {
-        for( FileStatus fileStatus : status ) {
-            BufferedReader reader;
-            LOGGER.info("Found " + fileStatus.getPath().getName());
-            if( persistQueue.size() > 0 ) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                }
-            }
-            if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
-                LOGGER.info("Processing " + fileStatus.getPath().getName());
-                try {
-                    reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath())));
-
-                    String line = "";
-                    do{
-                        try {
-                            line = reader.readLine();
-                            if( !Strings.isNullOrEmpty(line) ) {
-                                String[] fields = line.split(Character.toString(DELIMITER));
-                                StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(fields[2]));
-                                persistQueue.offer(entry);
-                            }
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                            LOGGER.warn(e.getMessage());
-                        }
-                    } while( line != null );
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.warn(e.getMessage());
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index f0bee1f..dc6ea16 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -3,15 +3,11 @@ package org.apache.streams.hdfs;
 import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.StreamsDatum;
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.Calendar;
-import java.util.Random;
 
 public class WebHdfsPersistReaderTask implements Runnable {
 
@@ -28,30 +24,39 @@ public class WebHdfsPersistReaderTask implements Runnable {
 
         for( FileStatus fileStatus : reader.status ) {
             BufferedReader bufferedReader;
-
+            LOGGER.info("Found " + fileStatus.getPath().getName());
             if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
+                LOGGER.info("Processing " + fileStatus.getPath().getName());
                 try {
                     bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOGGER.error(e.getMessage());
+                    return;
+                }
 
-                    String line = "";
-                    do{
-                        try {
-                            line = bufferedReader.readLine();
-                            if( !Strings.isNullOrEmpty(line) ) {
-                                String[] fields = line.split(Character.toString(reader.DELIMITER));
-                                Calendar cal = Calendar.getInstance();
-                                cal.setTimeInMillis(new Long(fields[2]));
-                                StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime()));
-                                reader.persistQueue.offer(entry);
+                String line = "";
+                do{
+                    try {
+                        line = bufferedReader.readLine();
+                        if( !Strings.isNullOrEmpty(line) ) {
+                            String[] fields = line.split(Character.toString(reader.DELIMITER));
+                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+                            boolean success;
+                            do {
+                                success = reader.persistQueue.offer(entry);
+                                Thread.yield();
                             }
-                        } catch (Exception e) {
-                            LOGGER.warn("Failed reading " + line);
+                            while( success == false );
+
                         }
-                    } while( line != null );
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    break;
-                }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        LOGGER.warn(e.getMessage());
+                    }
+                } while( !Strings.isNullOrEmpty(line) );
+                LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
+
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/pom.xml b/streams-contrib/streams-provider-google/google-gmail/pom.xml
new file mode 100644
index 0000000..84e0346
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-provider-google</artifactId>
+        <version>0.1.STREAMS26-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>google-gmail</artifactId>
+
+    <repositories>
+        <repository>
+            <id>typesafe</id>
+            <name>typesafe</name>
+            <url>http://repo.typesafe.com/typesafe/repo</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.gmail4j</groupId>
+            <artifactId>gmail4j</artifactId>
+            <version>0.5-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/com/google/gmail</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.google.gmail.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
new file mode 100644
index 0000000..2d2d2aa
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
@@ -0,0 +1,27 @@
+package com.google.gmail;
+
+import com.google.gmail.GMailConfiguration;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailConfigurator.class);
+
+    public static GMailConfiguration detectConfiguration(Config gmail) {
+
+        GMailConfiguration gmailConfiguration = new GMailConfiguration();
+
+        gmailConfiguration.setUserName(gmail.getString("username"));
+        gmailConfiguration.setPassword(gmail.getString("password"));
+
+        return gmailConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
new file mode 100644
index 0000000..068c214
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
@@ -0,0 +1,58 @@
+package com.google.gmail.provider;
+
+import com.googlecode.gmail4j.GmailClient;
+import com.googlecode.gmail4j.GmailMessage;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailImapProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailImapProviderTask.class);
+
+    private GMailProvider provider;
+
+    public GMailImapProviderTask(GMailProvider provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public void run() {
+
+        Calendar calendar = new GregorianCalendar();
+
+        calendar.set(Calendar.YEAR, 2000);
+        calendar.set(Calendar.MONTH, 0);
+        calendar.set(Calendar.DAY_OF_MONTH, 0);
+        calendar.set(Calendar.HOUR, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+
+        final List<GmailMessage> messages = this.provider.imapClient.getMessagesBy(
+                GmailClient.EmailSearchStrategy.DATE_GT,
+                calendar.getTime().toString()
+        );
+
+        for (GmailMessage message : messages) {
+
+            Activity activity;
+            GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider );
+            activity = serializer.deserialize(message);
+            StreamsDatum entry = new StreamsDatum(activity);
+            this.provider.providerQueue.offer(entry);
+
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
new file mode 100644
index 0000000..6c49bda
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
@@ -0,0 +1,194 @@
+package com.google.gmail.provider;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.googlecode.gmail4j.GmailException;
+import com.googlecode.gmail4j.GmailMessage;
+import com.googlecode.gmail4j.javamail.JavaMailGmailMessage;
+import com.sun.mail.imap.IMAPFolder;
+import com.sun.mail.imap.IMAPMessage;
+import com.sun.mail.imap.IMAPSSLStore;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.internet.MimeMultipart;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class);
+
+    GMailProvider provider;
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    public GMailMessageActivitySerializer(GMailProvider provider) {
+
+        this.provider = provider;
+
+        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+
+        mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class);
+
+    }
+
+    public GMailMessageActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "gmail.v1";
+    }
+
+    @Override
+    public GmailMessage serialize(Activity activity) {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(GmailMessage gmailMessage) {
+
+        Activity activity = new Activity();
+        activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber())));
+        activity.setPublished(gmailMessage.getSendDate());
+        Provider provider = new Provider();
+        provider.setId("http://gmail.com");
+        provider.setDisplayName("GMail");
+        activity.setProvider(provider);
+        Actor actor = new Actor();
+        actor.setId(gmailMessage.getFrom().getEmail());
+        actor.setDisplayName(gmailMessage.getFrom().getName());
+        activity.setActor(actor);
+        activity.setVerb("email");
+        ActivityObject object = new ActivityObject();
+        try {
+            object.setId(gmailMessage.getTo().get(0).getEmail());
+            object.setDisplayName(gmailMessage.getTo().get(0).getName());
+        } catch( GmailException e ) {
+            LOGGER.warn(e.getMessage());
+        }
+        activity.setTitle(gmailMessage.getSubject());
+        try {
+            activity.setContent(gmailMessage.getContentText());
+        } catch( GmailException e ) {
+            LOGGER.warn(e.getMessage());
+        }
+        activity.setObject(object);
+
+//        try {
+//            // if jackson can't serialize the object, find out now
+//            String jsonString = mapper.writeValueAsString(gmailMessage);
+//            ObjectNode jsonObject = mapper.valueToTree(gmailMessage);
+//            // since it can, write the entire source object to extensions.gmail
+//            Map<String, Object> extensions = Maps.newHashMap();
+//            extensions.put("gmail", gmailMessage);
+//            activity.setAdditionalProperty("extensions", extensions);
+//        } catch (JsonProcessingException e) {
+//            LOGGER.debug("Failed Json Deserialization");
+//            e.printStackTrace();
+//        }
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<GmailMessage> serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+    public Activity convert(ObjectNode event) {
+        return null;
+    }
+
+    public static Generator buildGenerator(ObjectNode event) {
+        return null;
+    }
+
+    public static Icon getIcon(ObjectNode event) {
+        return null;
+    }
+
+    public static Provider buildProvider(ObjectNode event) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:gmail");
+        return provider;
+    }
+
+    public static List<Object> getLinks(ObjectNode event) {
+        return null;
+    }
+
+    public static String getUrls(ObjectNode event) {
+        return null;
+    }
+
+    public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        extensions.put("gmail", gmailMessage);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:gmail", idparts));
+    }
+
+    interface MessageMixIn {
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPSSLStore getDefaultFolder(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPSSLStore getPersonalNamespaces(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPFolder getStore(); // we don't need it!
+        //        @JsonManagedReference
+//        @JsonIgnore
+//        @JsonBackReference
+        //IMAPFolder getParent(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        @JsonBackReference
+        IMAPMessage getFolder(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        @JsonProperty("parent")
+        @JsonBackReference
+        MimeMultipart getParent();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
new file mode 100644
index 0000000..7ec157e
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -0,0 +1,131 @@
+package com.google.gmail.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gmail.GMailConfiguration;
+import com.google.gmail.GMailConfigurator;
+import com.googlecode.gmail4j.GmailClient;
+import com.googlecode.gmail4j.GmailConnection;
+import com.googlecode.gmail4j.http.HttpGmailConnection;
+import com.googlecode.gmail4j.javamail.ImapGmailClient;
+import com.googlecode.gmail4j.javamail.ImapGmailConnection;
+import com.googlecode.gmail4j.rss.RssGmailClient;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class);
+
+    private GMailConfiguration config;
+
+    private Class klass;
+
+    public GMailConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(GMailConfiguration config) {
+        this.config = config;
+    }
+
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    public BlockingQueue<Object> getInQueue() {
+        return inQueue;
+    }
+
+    protected GmailClient rssClient;
+    protected ImapGmailClient imapClient;
+
+    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public GMailProvider() {
+        Config config = StreamsConfigurator.config.getConfig("gmail");
+        this.config = GMailConfigurator.detectConfiguration(config);
+    }
+
+    public GMailProvider(GMailConfiguration config) {
+        this.config = config;
+    }
+
+    public GMailProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("gmail");
+        this.config = GMailConfigurator.detectConfiguration(config);
+        this.klass = klass;
+    }
+
+    public GMailProvider(GMailConfiguration config, Class klass) {
+        this.config = config;
+        this.klass = klass;
+    }
+
+    @Override
+    public void startStream() {
+        new Thread(new GMailImapProviderTask(this)).start();
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(this.klass);
+
+        Preconditions.checkNotNull(config.getUserName());
+        Preconditions.checkNotNull(config.getPassword());
+
+        rssClient = new RssGmailClient();
+        GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray());
+        rssClient.setConnection(rssConnection);
+
+        imapClient = new ImapGmailClient();
+        GmailConnection imapConnection = new ImapGmailConnection();
+        imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray());
+        imapClient.setConnection(imapConnection);
+    }
+
+    @Override
+    public void cleanUp() {
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
new file mode 100644
index 0000000..73b6d77
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
@@ -0,0 +1,36 @@
+package com.google.gmail.provider;
+
+import com.googlecode.gmail4j.GmailMessage;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailRssProviderTask implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailRssProviderTask.class);
+
+    private GMailProvider provider;
+
+    public GMailRssProviderTask(GMailProvider provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public void run() {
+
+        final List<GmailMessage> messages = this.provider.rssClient.getUnreadMessages();
+        for (GmailMessage message : messages) {
+
+            StreamsDatum entry = new StreamsDatum(message);
+
+            this.provider.providerQueue.offer(entry);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
new file mode 100644
index 0000000..b25d5e0
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
@@ -0,0 +1,14 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "com.google.gmail.GMailConfiguration",
+    "properties": {
+        "userName": {
+            "type": "string"
+        },
+        "password": {
+            "type": "string"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
new file mode 100644
index 0000000..e9641fc
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -0,0 +1,53 @@
+package com.google.gmail.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 8/20/13
+ * Time: 5:57 PM
+ * To change this template use File | Settings | File Templates.
+ */
+@Ignore
+public class GMailMessageSerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Ignore
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                LOGGER.debug(line);
+
+                // implement
+            }
+        } catch( Exception e ) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}