You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/21 20:44:51 UTC
[5/5] git commit: adding google provider,
tweaks to localbuilder and hdfs reader
adding google provider, tweaks to localbuilder and hdfs reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/61592dc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/61592dc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/61592dc0
Branch: refs/heads/STREAMS-26
Commit: 61592dc0742bcae969c0c34c9d4158d5d164eddc
Parents: 2ec7fe8
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Mar 21 14:44:13 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Mar 21 14:44:13 2014 -0500
----------------------------------------------------------------------
pom.xml | 2 +-
streams-contrib/pom.xml | 1 +
.../streams-components-test.iml | 39 +++-
.../apache/streams/hdfs/HdfsConfigurator.java | 15 ++
.../streams/hdfs/WebHdfsPersistReader.java | 63 ++----
.../streams/hdfs/WebHdfsPersistReaderTask.java | 49 ++---
.../google-gmail/pom.xml | 135 +++++++++++++
.../com/google/gmail/GMailConfigurator.java | 27 +++
.../gmail/provider/GMailImapProviderTask.java | 58 ++++++
.../GMailMessageActivitySerializer.java | 194 +++++++++++++++++++
.../google/gmail/provider/GMailProvider.java | 131 +++++++++++++
.../gmail/provider/GMailRssProviderTask.java | 36 ++++
.../com/google/gmail/GMailConfiguration.json | 14 ++
.../gmail/test/GMailMessageSerDeTest.java | 53 +++++
.../src/test/resources/datasift_jsons.txt | 101 ++++++++++
.../google-gplus/pom.xml | 145 ++++++++++++++
.../gplus/provider/GPlusActivitySerializer.java | 117 +++++++++++
.../gplus/provider/GPlusConfigurator.java | 36 ++++
.../gplus/provider/GPlusEventProcessor.java | 79 ++++++++
.../provider/GPlusHistoryProviderTask.java | 88 +++++++++
.../google/gplus/provider/GPlusProvider.java | 166 ++++++++++++++++
.../com/google/gplus/GPlusConfiguration.json | 58 ++++++
.../src/main/resources/reference.conf | 11 ++
.../gmail/test/GMailMessageSerDeTest.java | 53 +++++
.../src/test/resources/datasift_jsons.txt | 101 ++++++++++
streams-contrib/streams-provider-google/pom.xml | 44 +++++
.../core/builders/LocalStreamBuilder.java | 11 +-
.../streams/core/tasks/BaseStreamsTask.java | 2 +-
.../core/tasks/LocalStreamMonitorThread.java | 69 +++++++
29 files changed, 1821 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1fec2b7..b358dc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
<jaxbutil.version>1.2.6</jaxbutil.version>
<junit.version>4.11</junit.version>
<slf4j.version>1.7.6</slf4j.version>
- <logback.version>1.0.9</logback.version>
+ <logback.version>1.1.1</logback.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
<typesafe.config.version>1.2.0</typesafe.config.version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 896bb9b..239644b 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,6 +43,7 @@
<module>streams-persist-mongo</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
+ <module>streams-provider-google</module>
<module>streams-provider-gnip</module>
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-components-test/streams-components-test.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-components-test/streams-components-test.iml b/streams-contrib/streams-components-test/streams-components-test.iml
index fbec879..60ae74a 100644
--- a/streams-contrib/streams-components-test/streams-components-test.iml
+++ b/streams-contrib/streams-components-test/streams-components-test.iml
@@ -11,11 +11,44 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="streams-core (1)" />
+ <orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" />
+ <orderEntry type="module" module-name="streams-util (1)" />
+ <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+ <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.1.1" level="project" />
+ <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.1.1" level="project" />
+ <orderEntry type="module" module-name="streams-pojo (1)" />
+ <orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.codemodel:codemodel:2.6" level="project" />
+ <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
+ <orderEntry type="library" name="Maven: javax.validation:validation-api:1.0.0.GA" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.11" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml:jackson-xml-databind:0.5.0" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.0" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml:aalto-xml:0.9.9" level="project" />
+ <orderEntry type="library" name="Maven: nz.net.ultraq.jaxb:jaxb-utilities:1.2.6" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.7" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.istack:istack-commons-runtime:2.16" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
+ <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
+ <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.2.1" level="project" />
+ <orderEntry type="library" name="Maven: org.json:json:20090211" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
- <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
- <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
- <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
</component>
<component name="POM File Configuration" pomFile="" />
</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index dc2b338..dfbc273 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -1,5 +1,6 @@
package org.apache.streams.hdfs;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -11,6 +12,8 @@ public class HdfsConfigurator {
private final static Logger LOGGER = LoggerFactory.getLogger(HdfsConfigurator.class);
+ private final static ObjectMapper mapper = new ObjectMapper();
+
public static HdfsConfiguration detectConfiguration(Config hdfs) {
String host = hdfs.getString("host");
Long port = hdfs.getLong("port");
@@ -29,4 +32,16 @@ public class HdfsConfigurator {
return hdfsConfiguration;
}
+ public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) {
+
+ HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
+ HdfsReaderConfiguration hdfsReaderConfiguration = mapper.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class);
+
+ String readerPath = hdfs.getString("readerPath");
+
+ hdfsReaderConfiguration.setReaderPath(readerPath);
+
+ return hdfsReaderConfiguration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 511f684..e01d9d4 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -1,13 +1,14 @@
package org.apache.streams.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.core.StreamsDatum;
@@ -17,16 +18,16 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by sblackmon on 2/28/14.
@@ -117,11 +118,19 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
connectToWebHDFS();
path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath());
try {
- status = client.listStatus(path);
+ if( client.isFile(path)) {
+ FileStatus fileStatus = client.getFileStatus(path);
+ status = new FileStatus[1];
+ status[0] = fileStatus;
+ } else if( client.isDirectory(path)){
+ status = client.listStatus(path);
+ } else {
+ LOGGER.error("Neither file nor directory, wtf");
+ }
} catch (IOException e) {
e.printStackTrace();
}
- persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ persistQueue = new LinkedBlockingQueue<StreamsDatum>(10000);
executor = Executors.newSingleThreadExecutor();
}
@@ -132,7 +141,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
@Override
public StreamsResultSet readAll() {
- readSourceWritePersistQueue();
+ startStream();
return new StreamsResultSet(persistQueue);
}
@@ -167,40 +176,4 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
return null;
}
- private void readSourceWritePersistQueue() {
- for( FileStatus fileStatus : status ) {
- BufferedReader reader;
- LOGGER.info("Found " + fileStatus.getPath().getName());
- if( persistQueue.size() > 0 ) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
- LOGGER.info("Processing " + fileStatus.getPath().getName());
- try {
- reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath())));
-
- String line = "";
- do{
- try {
- line = reader.readLine();
- if( !Strings.isNullOrEmpty(line) ) {
- String[] fields = line.split(Character.toString(DELIMITER));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(fields[2]));
- persistQueue.offer(entry);
- }
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(e.getMessage());
- }
- } while( line != null );
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(e.getMessage());
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index f0bee1f..dc6ea16 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -3,15 +3,11 @@ package org.apache.streams.hdfs;
import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.StreamsDatum;
-import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
-import java.io.IOException;
import java.io.InputStreamReader;
-import java.util.Calendar;
-import java.util.Random;
public class WebHdfsPersistReaderTask implements Runnable {
@@ -28,30 +24,39 @@ public class WebHdfsPersistReaderTask implements Runnable {
for( FileStatus fileStatus : reader.status ) {
BufferedReader bufferedReader;
-
+ LOGGER.info("Found " + fileStatus.getPath().getName());
if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
+ LOGGER.info("Processing " + fileStatus.getPath().getName());
try {
bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.error(e.getMessage());
+ return;
+ }
- String line = "";
- do{
- try {
- line = bufferedReader.readLine();
- if( !Strings.isNullOrEmpty(line) ) {
- String[] fields = line.split(Character.toString(reader.DELIMITER));
- Calendar cal = Calendar.getInstance();
- cal.setTimeInMillis(new Long(fields[2]));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime()));
- reader.persistQueue.offer(entry);
+ String line = "";
+ do{
+ try {
+ line = bufferedReader.readLine();
+ if( !Strings.isNullOrEmpty(line) ) {
+ String[] fields = line.split(Character.toString(reader.DELIMITER));
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+ boolean success;
+ do {
+ success = reader.persistQueue.offer(entry);
+ Thread.yield();
}
- } catch (Exception e) {
- LOGGER.warn("Failed reading " + line);
+ while( success == false );
+
}
- } while( line != null );
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+ } while( !Strings.isNullOrEmpty(line) );
+ LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/pom.xml b/streams-contrib/streams-provider-google/google-gmail/pom.xml
new file mode 100644
index 0000000..84e0346
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-google</artifactId>
+ <version>0.1.STREAMS26-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>google-gmail</artifactId>
+
+ <repositories>
+ <repository>
+ <id>typesafe</id>
+ <name>typesafe</name>
+ <url>http://repo.typesafe.com/typesafe/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path-assert</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.gmail4j</groupId>
+ <artifactId>gmail4j</artifactId>
+ <version>0.5-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/com/google/gmail</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.google.gmail.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
new file mode 100644
index 0000000..2d2d2aa
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
@@ -0,0 +1,27 @@
+package com.google.gmail;
+
+import com.google.gmail.GMailConfiguration;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailConfigurator.class);
+
+ public static GMailConfiguration detectConfiguration(Config gmail) {
+
+ GMailConfiguration gmailConfiguration = new GMailConfiguration();
+
+ gmailConfiguration.setUserName(gmail.getString("username"));
+ gmailConfiguration.setPassword(gmail.getString("password"));
+
+ return gmailConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
new file mode 100644
index 0000000..068c214
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
@@ -0,0 +1,58 @@
+package com.google.gmail.provider;
+
+import com.googlecode.gmail4j.GmailClient;
+import com.googlecode.gmail4j.GmailMessage;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailImapProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailImapProviderTask.class);
+
+ private GMailProvider provider;
+
+ public GMailImapProviderTask(GMailProvider provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public void run() {
+
+ Calendar calendar = new GregorianCalendar();
+
+ calendar.set(Calendar.YEAR, 2000);
+ calendar.set(Calendar.MONTH, 0);
+ calendar.set(Calendar.DAY_OF_MONTH, 0);
+ calendar.set(Calendar.HOUR, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ final List<GmailMessage> messages = this.provider.imapClient.getMessagesBy(
+ GmailClient.EmailSearchStrategy.DATE_GT,
+ calendar.getTime().toString()
+ );
+
+ for (GmailMessage message : messages) {
+
+ Activity activity;
+ GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider );
+ activity = serializer.deserialize(message);
+ StreamsDatum entry = new StreamsDatum(activity);
+ this.provider.providerQueue.offer(entry);
+
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
new file mode 100644
index 0000000..6c49bda
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
@@ -0,0 +1,194 @@
+package com.google.gmail.provider;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.googlecode.gmail4j.GmailException;
+import com.googlecode.gmail4j.GmailMessage;
+import com.googlecode.gmail4j.javamail.JavaMailGmailMessage;
+import com.sun.mail.imap.IMAPFolder;
+import com.sun.mail.imap.IMAPMessage;
+import com.sun.mail.imap.IMAPSSLStore;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.internet.MimeMultipart;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class GMailMessageActivitySerializer implements ActivitySerializer<GmailMessage> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GMailMessageActivitySerializer.class);
+
+ GMailProvider provider;
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ public GMailMessageActivitySerializer(GMailProvider provider) {
+
+ this.provider = provider;
+
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+
+ mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class);
+ mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class);
+ mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class);
+ mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class);
+ mapper.addMixInAnnotations(JavaMailGmailMessage.class, MessageMixIn.class);
+
+ }
+
+ public GMailMessageActivitySerializer() {
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "gmail.v1";
+ }
+
+ @Override
+ public GmailMessage serialize(Activity activity) {
+ return null;
+ }
+
+ @Override
+ public Activity deserialize(GmailMessage gmailMessage) {
+
+ Activity activity = new Activity();
+ activity.setId(formatId(this.provider.getConfig().getUserName(), String.valueOf(gmailMessage.getMessageNumber())));
+ activity.setPublished(gmailMessage.getSendDate());
+ Provider provider = new Provider();
+ provider.setId("http://gmail.com");
+ provider.setDisplayName("GMail");
+ activity.setProvider(provider);
+ Actor actor = new Actor();
+ actor.setId(gmailMessage.getFrom().getEmail());
+ actor.setDisplayName(gmailMessage.getFrom().getName());
+ activity.setActor(actor);
+ activity.setVerb("email");
+ ActivityObject object = new ActivityObject();
+ try {
+ object.setId(gmailMessage.getTo().get(0).getEmail());
+ object.setDisplayName(gmailMessage.getTo().get(0).getName());
+ } catch( GmailException e ) {
+ LOGGER.warn(e.getMessage());
+ }
+ activity.setTitle(gmailMessage.getSubject());
+ try {
+ activity.setContent(gmailMessage.getContentText());
+ } catch( GmailException e ) {
+ LOGGER.warn(e.getMessage());
+ }
+ activity.setObject(object);
+
+// try {
+// // if jackson can't serialize the object, find out now
+// String jsonString = mapper.writeValueAsString(gmailMessage);
+// ObjectNode jsonObject = mapper.valueToTree(gmailMessage);
+// // since it can, write the entire source object to extensions.gmail
+// Map<String, Object> extensions = Maps.newHashMap();
+// extensions.put("gmail", gmailMessage);
+// activity.setAdditionalProperty("extensions", extensions);
+// } catch (JsonProcessingException e) {
+// LOGGER.debug("Failed Json Deserialization");
+// e.printStackTrace();
+// }
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<GmailMessage> serializedList) {
+ throw new NotImplementedException("Not currently implemented");
+ }
+
+ public Activity convert(ObjectNode event) {
+ return null;
+ }
+
+ public static Generator buildGenerator(ObjectNode event) {
+ return null;
+ }
+
+ public static Icon getIcon(ObjectNode event) {
+ return null;
+ }
+
+ public static Provider buildProvider(ObjectNode event) {
+ Provider provider = new Provider();
+ provider.setId("id:providers:gmail");
+ return provider;
+ }
+
+ public static List<Object> getLinks(ObjectNode event) {
+ return null;
+ }
+
+ public static String getUrls(ObjectNode event) {
+ return null;
+ }
+
+ public static void addGMailExtension(Activity activity, GmailMessage gmailMessage) {
+ Map<String, Object> extensions = ensureExtensions(activity);
+ extensions.put("gmail", gmailMessage);
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:gmail", idparts));
+ }
+
+ interface MessageMixIn {
+ @JsonManagedReference
+ @JsonIgnore
+ IMAPSSLStore getDefaultFolder(); // we don't need it!
+ @JsonManagedReference
+ @JsonIgnore
+ IMAPSSLStore getPersonalNamespaces(); // we don't need it!
+ @JsonManagedReference
+ @JsonIgnore
+ IMAPFolder getStore(); // we don't need it!
+ // @JsonManagedReference
+// @JsonIgnore
+// @JsonBackReference
+ //IMAPFolder getParent(); // we don't need it!
+ @JsonManagedReference
+ @JsonIgnore
+ @JsonBackReference
+ IMAPMessage getFolder(); // we don't need it!
+ @JsonManagedReference
+ @JsonIgnore
+ @JsonProperty("parent")
+ @JsonBackReference
+ MimeMultipart getParent();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
new file mode 100644
index 0000000..7ec157e
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -0,0 +1,131 @@
+package com.google.gmail.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gmail.GMailConfiguration;
+import com.google.gmail.GMailConfigurator;
+import com.googlecode.gmail4j.GmailClient;
+import com.googlecode.gmail4j.GmailConnection;
+import com.googlecode.gmail4j.http.HttpGmailConnection;
+import com.googlecode.gmail4j.javamail.ImapGmailClient;
+import com.googlecode.gmail4j.javamail.ImapGmailConnection;
+import com.googlecode.gmail4j.rss.RssGmailClient;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailProvider implements StreamsProvider {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class);
+
+ private GMailConfiguration config;
+
+ private Class klass;
+
+ public GMailConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(GMailConfiguration config) {
+ this.config = config;
+ }
+
+ protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+
+ protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ public BlockingQueue<Object> getInQueue() {
+ return inQueue;
+ }
+
+ protected GmailClient rssClient;
+ protected ImapGmailClient imapClient;
+
+ protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ public GMailProvider() {
+ Config config = StreamsConfigurator.config.getConfig("gmail");
+ this.config = GMailConfigurator.detectConfiguration(config);
+ }
+
+ public GMailProvider(GMailConfiguration config) {
+ this.config = config;
+ }
+
+ public GMailProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("gmail");
+ this.config = GMailConfigurator.detectConfiguration(config);
+ this.klass = klass;
+ }
+
+ public GMailProvider(GMailConfiguration config, Class klass) {
+ this.config = config;
+ this.klass = klass;
+ }
+
+ @Override
+ public void startStream() {
+ new Thread(new GMailImapProviderTask(this)).start();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ Preconditions.checkNotNull(this.klass);
+
+ Preconditions.checkNotNull(config.getUserName());
+ Preconditions.checkNotNull(config.getPassword());
+
+ rssClient = new RssGmailClient();
+ GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray());
+ rssClient.setConnection(rssConnection);
+
+ imapClient = new ImapGmailClient();
+ GmailConnection imapConnection = new ImapGmailConnection();
+ imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray());
+ imapClient.setConnection(imapConnection);
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
new file mode 100644
index 0000000..73b6d77
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
@@ -0,0 +1,36 @@
+package com.google.gmail.provider;
+
+import com.googlecode.gmail4j.GmailMessage;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class GMailRssProviderTask implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailRssProviderTask.class);
+
+ private GMailProvider provider;
+
+ public GMailRssProviderTask(GMailProvider provider) {
+ this.provider = provider;
+ }
+
+ @Override
+ public void run() {
+
+ final List<GmailMessage> messages = this.provider.rssClient.getUnreadMessages();
+ for (GmailMessage message : messages) {
+
+ StreamsDatum entry = new StreamsDatum(message);
+
+ this.provider.providerQueue.offer(entry);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
new file mode 100644
index 0000000..b25d5e0
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/jsonschema/com/google/gmail/GMailConfiguration.json
@@ -0,0 +1,14 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "com.google.gmail.GMailConfiguration",
+ "properties": {
+ "userName": {
+ "type": "string"
+ },
+ "password": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/61592dc0/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
new file mode 100644
index 0000000..e9641fc
--- /dev/null
+++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -0,0 +1,53 @@
+package com.google.gmail.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 8/20/13
+ * Time: 5:57 PM
+ * To change this template use File | Settings | File Templates.
+ */
+@Ignore
+public class GMailMessageSerDeTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @Ignore
+ @Test
+ public void Tests()
+ {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+ InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ LOGGER.debug(line);
+
+ // implement
+ }
+ } catch( Exception e ) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+}