You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 16:50:45 UTC
[08/13] git commit: adding uncommitted modules
adding uncommitted modules
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/89ff615e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/89ff615e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/89ff615e
Branch: refs/heads/springcleaning
Commit: 89ff615eaa9281540f5d9187fee51ef6f0a8cfa9
Parents: 1993313
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Mar 23 18:27:20 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Mar 23 18:27:20 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-persist-hbase/pom.xml | 109 +++
.../apache/streams/hbase/HbaseConfigurator.java | 43 +
.../streams/hbase/HbasePersistWriter.java | 189 ++++
.../streams/hbase/HbasePersistWriterTask.java | 38 +
.../streams/hbase/HbaseConfiguration.json | 41 +
.../src/main/resources/reference.properties | 10 +
.../provider/DatasiftEventProcessor.java | 105 +++
.../provider/DatasiftStreamConfigurator.java | 26 +
.../provider/DatasiftStreamProvider.java | 201 +++++
.../serializer/DatasiftActivitySerializer.java | 181 ++++
.../src/test/resources/part-r-00000.json | 862 +++++++++++++++++++
.../org/apache/streams/core/DatumStatus.java | 7 +
.../apache/streams/core/DatumStatusCounter.java | 41 +
streams-runtimes/pom.xml | 39 +
streams-runtimes/streams-runtime-local/pom.xml | 103 +++
.../jsonschema/LocalRuntimeConfiguration.json | 12 +
.../src/test/resources/TestFile.txt | 4 +
17 files changed, 2011 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/pom.xml b/streams-contrib/streams-persist-hbase/pom.xml
new file mode 100644
index 0000000..8dc9618
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-hbase</artifactId>
+
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.0.0-cdh4.5.0</version>
+ <scope>compile</scope>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.6-cdh4.5.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.hbase.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
new file mode 100644
index 0000000..283307f
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
@@ -0,0 +1,43 @@
+package org.apache.streams.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class HbaseConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HbaseConfigurator.class);
+
+ public static HbaseConfiguration detectConfiguration() {
+
+ Config zookeeper = StreamsConfigurator.config.getConfig("zookeeper");
+ Config hbase = StreamsConfigurator.config.getConfig("hbase");
+
+ String rootdir = hbase.getString("rootdir");
+
+ Config znode = zookeeper.getConfig("znode");
+
+ String rootserver = znode.getString("rootserver");
+ String parent = znode.getString("parent");
+ Integer clientPort = hbase.getConfig("zookeeper").getConfig("property").getInt("clientPort");
+ String quorum = hbase.getConfig("zookeeper").getString("quorum");
+
+ HbaseConfiguration hbaseConfiguration = new HbaseConfiguration();
+
+ hbaseConfiguration.setRootdir(rootdir);
+ hbaseConfiguration.setRootserver(rootserver);
+ hbaseConfiguration.setParent(parent);
+ hbaseConfiguration.setQuorum(quorum);
+ hbaseConfiguration.setClientPort(clientPort.longValue());
+ hbaseConfiguration.setTable(hbase.getString("table"));
+ hbaseConfiguration.setFamily(hbase.getString("family"));
+ hbaseConfiguration.setQualifier(hbase.getString("qualifier"));
+
+ return hbaseConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
new file mode 100644
index 0000000..4754c0d
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
@@ -0,0 +1,189 @@
+package org.apache.streams.hbase;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.util.GuidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Closeable
+{
+ private final static Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriter.class);
+
+ protected HConnection connection;
+ protected HTablePool pool;
+ protected HTableInterface table;
+ protected HTableDescriptor descriptor;
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private HbaseConfiguration config;
+
+ public HbasePersistWriter() {
+ this.config = HbaseConfigurator.detectConfiguration();
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public HbasePersistWriter(Queue<StreamsDatum> persistQueue) {
+ this.config = HbaseConfigurator.detectConfiguration();
+ this.persistQueue = persistQueue;
+ }
+
+ private synchronized void connectToHbase()
+ {
+ Configuration configuration = new Configuration();
+ configuration.set("hbase.rootdir", config.getRootdir());
+ configuration.set("zookeeper.znode.parent", config.getParent());
+ configuration.set("zookeeper.znode.rootserver", config.getRootserver());
+ //configuration.set("hbase.master", config.getRootserver());
+ //configuration.set("hbase.cluster.distributed", "false");
+ configuration.set("hbase.zookeeper.quorum", config.getQuorum());
+ configuration.set("hbase.zookeeper.property.clientPort", Long.toString(config.getClientPort()));
+ configuration.setInt("zookeeper.session.timeout", 1000);
+
+ configuration.setInt("timeout", 1000);
+
+ //pool = new HTablePool(configuration, 10);
+ try {
+ connection = HConnectionManager.createConnection(configuration);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try {
+ // table = new HTable(configuration, config.getTable());
+ // table = (HTable) pool.getTable(config.getTable());
+ table = new HTable(configuration, config.getTable().getBytes());
+ table.setAutoFlush(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ //
+
+ try {
+ descriptor = table.getTableDescriptor();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ try
+ {
+ LOGGER.info("Table : {}", descriptor);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("There was an error connecting to HBase, please check your settings and try again");
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ ObjectNode node;
+ Put put = new Put();
+ if( streamsDatum.getDocument() instanceof String ) {
+ try {
+ node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ put.setId(GuidUtils.generateGuid(node.toString()));
+ try {
+ byte[] value = node.binaryValue();
+ put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value);
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ } else {
+ try {
+ node = mapper.valueToTree(streamsDatum.getDocument());
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ put.setId(GuidUtils.generateGuid(node.toString()));
+ try {
+ byte[] value = node.binaryValue();
+ put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value);
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ }
+ try {
+ table.put(put);
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOGGER.warn("Failure executin put: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+
+ }
+
+ public void flush() throws IOException
+ {
+ table.flushCommits();
+ }
+
+ public synchronized void close() throws IOException
+ {
+ table.close();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ connectToHbase();
+
+ Thread task = new Thread(new HbasePersistWriterTask(this));
+ task.start();
+
+ try {
+ task.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
new file mode 100644
index 0000000..a09c8ab
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
@@ -0,0 +1,38 @@
+package org.apache.streams.hbase;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class HbasePersistWriterTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriterTask.class);
+
+ private HbasePersistWriter writer;
+
+ public HbasePersistWriterTask(HbasePersistWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ if( writer.persistQueue.peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json b/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
new file mode 100644
index 0000000..d3ac8db
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
@@ -0,0 +1,41 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.hbase.HbaseConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "rootdir": {
+ "type": "string",
+ "description": "Hbase host"
+ },
+ "parent": {
+ "type": "string",
+ "description": "WebHdfs port"
+ },
+ "rootserver": {
+ "type": "string",
+ "description": "Base path"
+ },
+ "quorum": {
+ "type": "string",
+ "description": "User"
+ },
+ "clientPort": {
+ "type": "integer",
+ "description": "ZK Port"
+ },
+ "table": {
+ "type": "string",
+ "description": "Table"
+ },
+ "family": {
+ "type": "string",
+ "description": "Column Family"
+ },
+ "qualifier": {
+ "type": "string",
+ "description": "Qualifier"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties b/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
new file mode 100644
index 0000000..699f655
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
@@ -0,0 +1,10 @@
+hbase.rootdir = "hdfs://localhost:8020/hbase"
+
+zookeeper.znode.parent = "/hbase"
+
+zookeeper.znode.rootserver = "localhost"
+
+hbase.zookeeper.quorum = "localhost"
+
+hbase.zookeeper.property.clientPort = 2181
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
new file mode 100644
index 0000000..3c0aa8b
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -0,0 +1,105 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.stream.Interaction;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftEventProcessor implements Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftEventProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<Interaction> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private DatasiftActivitySerializer datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public DatasiftEventProcessor(Queue<Interaction> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public DatasiftEventProcessor(Queue<Interaction> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.outClass = outClass;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ Object item;
+ try {
+ item = inQueue.poll();
+ if(item instanceof String && item.equals(TERMINATE)) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ Thread.sleep(new Random().nextInt(100));
+
+ org.apache.streams.datasift.Datasift datasift = mapper.convertValue(item, Datasift.class);
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass)) {
+ outQueue.offer(new StreamsDatum(datasift.toString()));
+
+ }
+ else if( Interaction.class.equals(outClass))
+ {
+ outQueue.offer(new StreamsDatum(item));
+ }
+ else if( Tweet.class.equals(outClass))
+ {
+ // convert to desired format
+ Twitter twitter = datasift.getTwitter();
+
+ Tweet tweet = mapper.convertValue(twitter, Tweet.class);
+
+ if( tweet != null ) {
+
+ outQueue.offer(new StreamsDatum(tweet));
+
+ }
+ }
+ else if( Activity.class.equals(outClass))
+ {
+ // convert to desired format
+ Interaction entry = (Interaction) item;
+ if( entry != null ) {
+ Activity out = datasiftInteractionActivitySerializer.deserialize(datasift);
+
+ if( out != null )
+ outQueue.offer(new StreamsDatum(out));
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
new file mode 100644
index 0000000..71c4633
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
@@ -0,0 +1,26 @@
+package org.apache.streams.datasift.provider;
+
+import com.typesafe.config.Config;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftStreamConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamConfigurator.class);
+
+ public static DatasiftConfiguration detectConfiguration(Config datasift) {
+
+ DatasiftConfiguration datasiftConfiguration = new DatasiftConfiguration();
+
+ datasiftConfiguration.setApiKey(datasift.getString("apiKey"));
+ datasiftConfiguration.setUserName(datasift.getString("userName"));
+ datasiftConfiguration.setStreamHash(datasift.getStringList("hashes"));
+
+ return datasiftConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
new file mode 100644
index 0000000..d339385
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -0,0 +1,201 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.DataSiftConfig;
+import com.datasift.client.core.Stream;
+import com.datasift.client.stream.*;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftStreamProvider implements StreamsProvider {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
+
+ protected DatasiftConfiguration config = null;
+
+ protected DataSiftClient client;
+
+ private Class klass;
+
+ public DatasiftConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(DatasiftConfiguration config) {
+ this.config = config;
+ }
+
+ protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(10000);
+
+ protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ public BlockingQueue<Object> getInQueue() {
+ return inQueue;
+ }
+
+ protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(100, 100));
+
+ protected List<String> streamHashes;
+
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ public DatasiftStreamProvider() {
+ Config datasiftConfig = StreamsConfigurator.config.getConfig("datasift");
+ this.config = DatasiftStreamConfigurator.detectConfiguration(datasiftConfig);
+ }
+
+ public DatasiftStreamProvider(DatasiftConfiguration config) {
+ this.config = config;
+ }
+
+ public DatasiftStreamProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("datasift");
+ this.config = DatasiftStreamConfigurator.detectConfiguration(config);
+ this.klass = klass;
+ }
+
+ public DatasiftStreamProvider(DatasiftConfiguration config, Class klass) {
+ this.config = config;
+ this.klass = klass;
+ }
+
+ @Override
+ public void startStream() {
+
+ Preconditions.checkNotNull(this.klass);
+
+ Preconditions.checkNotNull(config);
+
+ Preconditions.checkNotNull(config.getStreamHash());
+
+ Preconditions.checkNotNull(config.getStreamHash().get(0));
+
+ for( String hash : config.getStreamHash()) {
+
+ client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
+
+ }
+
+ for( int i = 0; i < ((config.getStreamHash().size() / 5) + 1); i++ )
+ executor.submit(new DatasiftEventProcessor(inQueue, providerQueue, klass));
+
+ }
+
+ public void stop() {
+
+ for( String hash : config.getStreamHash()) {
+
+ client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
+
+ }
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ return (StreamsResultSet) providerQueue;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ Preconditions.checkNotNull(config);
+
+ String apiKey = config.getApiKey();
+ String userName = config.getUserName();
+
+ DataSiftConfig config = new DataSiftConfig(userName, apiKey);
+
+ client = new DataSiftClient(config);
+
+ client.liveStream().onError(new ErrorHandler());
+
+ //handle delete message
+ client.liveStream().onStreamEvent(new DeleteHandler());
+
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ public class Subscription extends StreamSubscription {
+ AtomicLong count = new AtomicLong();
+
+ public Subscription(Stream stream) {
+ super(stream);
+ }
+
+ public void onDataSiftLogMessage(DataSiftMessage di) {
+ //di.isWarning() is also available
+ System.out.println((di.isError() ? "Error" : di.isInfo() ? "Info" : "Warning") + ":\n" + di);
+ }
+
+ public void onMessage(Interaction i) {
+
+ LOGGER.debug("Processing:\n" + i);
+
+ inQueue.offer(i);
+
+ if (count.incrementAndGet() % 1000 == 0) {
+ LOGGER.info("Processed {}:\n " + count.get());
+
+ }
+
+ }
+ }
+
+ public class DeleteHandler extends StreamEventListener {
+ public void onDelete(DeletedInteraction di) {
+ //go off and delete the interaction if you have it stored. This is a strict requirement!
+ LOGGER.info("DELETED:\n " + di);
+ }
+ }
+
+ public class ErrorHandler extends ErrorListener {
+ public void exceptionCaught(Throwable t) {
+ LOGGER.warn(t.getMessage());
+ //do something useful...
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
new file mode 100644
index 0000000..c73abd7
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -0,0 +1,181 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
+
+ public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public String serializationFormat() {
+ return "application/json+datasift.com.v1.1";
+ }
+
+ @Override
+ public Datasift serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+ }
+
+ @Override
+ public Activity deserialize(Datasift serialized) {
+
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+ try {
+
+ Activity activity = convert(serialized);
+
+ return activity;
+
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to deserialize", e);
+ }
+
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<Datasift> datasifts) {
+ List<Activity> activities = Lists.newArrayList();
+ for( Datasift datasift : datasifts ) {
+ activities.add(deserialize(datasift));
+ }
+ return activities;
+ }
+
+ public static Date parse(String str) {
+ Date date;
+ String dstr;
+ DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
+ DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ try {
+ date = fmt.parse(str);
+ dstr = out.format(date);
+ return out.parse(dstr);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Invalid date format", e);
+ }
+ }
+
+ public static Generator buildGenerator(Interaction interaction) {
+ return null;
+ }
+
+ public static Icon getIcon(Interaction interaction) {
+ return null;
+ }
+
+ public static Provider buildProvider(Interaction interaction) {
+ Provider provider = new Provider();
+ provider.setId("id:providers:twitter");
+ return provider;
+ }
+
+ public static String getUrls(Interaction interaction) {
+ return null;
+ }
+
+ public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+ extensions.put("datasift", datasift);
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+ }
+
+ public Activity convert(Datasift event) {
+
+ Activity activity = new Activity();
+ activity.setActor(buildActor(event.getInteraction()));
+ activity.setVerb("post");
+ activity.setObject(buildActivityObject(event.getInteraction()));
+ activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+ activity.setTarget(buildTarget(event.getInteraction()));
+ activity.setPublished(parse(event.getInteraction().getCreatedAt()));
+ activity.setGenerator(buildGenerator(event.getInteraction()));
+ activity.setIcon(getIcon(event.getInteraction()));
+ activity.setProvider(buildProvider(event.getInteraction()));
+ activity.setTitle(event.getInteraction().getTitle());
+ activity.setContent(event.getInteraction().getContent());
+ activity.setUrl(event.getInteraction().getLink());
+ activity.setLinks(getLinks(event.getInteraction()));
+ addDatasiftExtension(activity, event);
+ if( event.getInteraction().getGeo() != null) {
+ addLocationExtension(activity, event.getInteraction());
+ }
+ return activity;
+ }
+
+ public static Actor buildActor(Interaction interaction) {
+ Actor actor = new Actor();
+ actor.setId(formatId(interaction.getAuthor().getId().toString()));
+ actor.setDisplayName(interaction.getAuthor().getUsername());
+ Image image = new Image();
+ image.setUrl(interaction.getAuthor().getAvatar());
+ actor.setImage(image);
+ if (interaction.getAuthor().getLink()!=null){
+ actor.setUrl(interaction.getAuthor().getLink());
+ }
+ return actor;
+ }
+
+ public static ActivityObject buildActivityObject(Interaction interaction) {
+ ActivityObject actObj = new ActivityObject();
+ actObj.setObjectType(interaction.getContenttype());
+ return actObj;
+ }
+
+ public static List<Object> getLinks(Interaction interaction) {
+ List<Object> links = Lists.newArrayList();
+ return links;
+ }
+
+ public static ActivityObject buildTarget(Interaction interaction) {
+ return null;
+ }
+
+ public static void addLocationExtension(Activity activity, Interaction interaction) {
+ Map<String, Object> extensions = ensureExtensions(activity);
+ Map<String, Object> location = new HashMap<String, Object>();
+ Map<String, Double> coordinates = new HashMap<String, Double>();
+ coordinates.put("latitude", interaction.getGeo().getLatitude());
+ coordinates.put("longitude", interaction.getGeo().getLongitude());
+ location.put("coordinates", coordinates);
+ extensions.put("location", location);
+ }
+
+}