You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/24 16:50:45 UTC

[08/13] git commit: adding uncommitted modules

adding uncommitted modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/89ff615e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/89ff615e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/89ff615e

Branch: refs/heads/springcleaning
Commit: 89ff615eaa9281540f5d9187fee51ef6f0a8cfa9
Parents: 1993313
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Mar 23 18:27:20 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Mar 23 18:27:20 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-hbase/pom.xml   | 109 +++
 .../apache/streams/hbase/HbaseConfigurator.java |  43 +
 .../streams/hbase/HbasePersistWriter.java       | 189 ++++
 .../streams/hbase/HbasePersistWriterTask.java   |  38 +
 .../streams/hbase/HbaseConfiguration.json       |  41 +
 .../src/main/resources/reference.properties     |  10 +
 .../provider/DatasiftEventProcessor.java        | 105 +++
 .../provider/DatasiftStreamConfigurator.java    |  26 +
 .../provider/DatasiftStreamProvider.java        | 201 +++++
 .../serializer/DatasiftActivitySerializer.java  | 181 ++++
 .../src/test/resources/part-r-00000.json        | 862 +++++++++++++++++++
 .../org/apache/streams/core/DatumStatus.java    |   7 +
 .../apache/streams/core/DatumStatusCounter.java |  41 +
 streams-runtimes/pom.xml                        |  39 +
 streams-runtimes/streams-runtime-local/pom.xml  | 103 +++
 .../jsonschema/LocalRuntimeConfiguration.json   |  12 +
 .../src/test/resources/TestFile.txt             |   4 +
 17 files changed, 2011 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/pom.xml b/streams-contrib/streams-persist-hbase/pom.xml
new file mode 100644
index 0000000..8dc9618
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-hbase</artifactId>
+
+    <repositories>
+        <repository>
+            <id>cloudera</id>
+            <url>https://repository.cloudera.com/artifactory/repo</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.0.0-cdh4.5.0</version>
+            <scope>compile</scope>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>0.94.6-cdh4.5.0</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>2.5.0</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.hbase.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
new file mode 100644
index 0000000..283307f
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
@@ -0,0 +1,43 @@
+package org.apache.streams.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class HbaseConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HbaseConfigurator.class);
+
+    public static HbaseConfiguration detectConfiguration() {
+
+        Config zookeeper = StreamsConfigurator.config.getConfig("zookeeper");
+        Config hbase = StreamsConfigurator.config.getConfig("hbase");
+
+        String rootdir = hbase.getString("rootdir");
+
+        Config znode = zookeeper.getConfig("znode");
+
+        String rootserver = znode.getString("rootserver");
+        String parent = znode.getString("parent");
+        Integer clientPort = hbase.getConfig("zookeeper").getConfig("property").getInt("clientPort");
+        String quorum = hbase.getConfig("zookeeper").getString("quorum");
+
+        HbaseConfiguration hbaseConfiguration = new HbaseConfiguration();
+
+        hbaseConfiguration.setRootdir(rootdir);
+        hbaseConfiguration.setRootserver(rootserver);
+        hbaseConfiguration.setParent(parent);
+        hbaseConfiguration.setQuorum(quorum);
+        hbaseConfiguration.setClientPort(clientPort.longValue());
+        hbaseConfiguration.setTable(hbase.getString("table"));
+        hbaseConfiguration.setFamily(hbase.getString("family"));
+        hbaseConfiguration.setQualifier(hbase.getString("qualifier"));
+
+        return hbaseConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
new file mode 100644
index 0000000..4754c0d
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
@@ -0,0 +1,189 @@
+package org.apache.streams.hbase;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.util.GuidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class HbasePersistWriter implements StreamsPersistWriter, Flushable, Closeable
+{
+    private final static Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriter.class);
+
+    protected HConnection connection;
+    protected HTablePool pool;
+    protected HTableInterface table;
+    protected HTableDescriptor descriptor;
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private HbaseConfiguration config;
+
+    public HbasePersistWriter() {
+        this.config = HbaseConfigurator.detectConfiguration();
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public HbasePersistWriter(Queue<StreamsDatum> persistQueue) {
+        this.config = HbaseConfigurator.detectConfiguration();
+        this.persistQueue = persistQueue;
+    }
+
+    private synchronized void connectToHbase()
+    {
+        Configuration configuration = new Configuration();
+        configuration.set("hbase.rootdir", config.getRootdir());
+        configuration.set("zookeeper.znode.parent", config.getParent());
+        configuration.set("zookeeper.znode.rootserver", config.getRootserver());
+        //configuration.set("hbase.master", config.getRootserver());
+        //configuration.set("hbase.cluster.distributed", "false");
+        configuration.set("hbase.zookeeper.quorum", config.getQuorum());
+        configuration.set("hbase.zookeeper.property.clientPort", Long.toString(config.getClientPort()));
+        configuration.setInt("zookeeper.session.timeout", 1000);
+
+        configuration.setInt("timeout", 1000);
+
+        //pool = new HTablePool(configuration, 10);
+        try {
+            connection = HConnectionManager.createConnection(configuration);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try {
+        //    table = new HTable(configuration, config.getTable());
+        //    table = (HTable) pool.getTable(config.getTable());
+            table = new HTable(configuration, config.getTable().getBytes());
+            table.setAutoFlush(true);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return;
+        }
+        //
+
+        try {
+            descriptor = table.getTableDescriptor();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try
+        {
+            LOGGER.info("Table : {}", descriptor);
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("There was an error connecting to HBase, please check your settings and try again");
+            e.printStackTrace();
+            return;
+        }
+    }
+    
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+        ObjectNode node;
+        Put put = new Put();
+        if( streamsDatum.getDocument() instanceof String ) {
+            try {
+                node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class);
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+                return;
+            }
+            put.setId(GuidUtils.generateGuid(node.toString()));
+            try {
+                byte[] value = node.binaryValue();
+                put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value);
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
+                return;
+            }
+        } else {
+            try {
+                node = mapper.valueToTree(streamsDatum.getDocument());
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+                return;
+            }
+            put.setId(GuidUtils.generateGuid(node.toString()));
+            try {
+                byte[] value = node.binaryValue();
+                put.add(config.getFamily().getBytes(), config.getQualifier().getBytes(), value);
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString());
+                return;
+            }
+        }
+        try {
+            table.put(put);
+        } catch (IOException e) {
+            e.printStackTrace();
+            LOGGER.warn("Failure executin put: {}", streamsDatum.getDocument().toString());
+            return;
+        }
+
+    }
+
+    public void flush() throws IOException
+    {
+        table.flushCommits();
+    }
+
+    public synchronized void close() throws IOException
+    {
+        table.close();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        connectToHbase();
+
+        Thread task = new Thread(new HbasePersistWriterTask(this));
+        task.start();
+
+        try {
+            task.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            return;
+        }
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+        try {
+            flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
new file mode 100644
index 0000000..a09c8ab
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
@@ -0,0 +1,38 @@
+package org.apache.streams.hbase;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class HbasePersistWriterTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HbasePersistWriterTask.class);
+
+    private HbasePersistWriter writer;
+
+    public HbasePersistWriterTask(HbasePersistWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            if( writer.persistQueue.peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json b/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
new file mode 100644
index 0000000..d3ac8db
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/jsonschema/org/apache/streams/hbase/HbaseConfiguration.json
@@ -0,0 +1,41 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.hbase.HbaseConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "rootdir": {
+            "type": "string",
+            "description": "Hbase host"
+        },
+        "parent": {
+            "type": "string",
+            "description": "WebHdfs port"
+        },
+        "rootserver": {
+            "type": "string",
+            "description": "Base path"
+        },
+        "quorum": {
+            "type": "string",
+            "description": "User"
+        },
+        "clientPort": {
+            "type": "integer",
+            "description": "ZK Port"
+        },
+        "table": {
+            "type": "string",
+            "description": "Table"
+        },
+        "family": {
+            "type": "string",
+            "description": "Column Family"
+        },
+        "qualifier": {
+            "type": "string",
+            "description": "Qualifier"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties b/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
new file mode 100644
index 0000000..699f655
--- /dev/null
+++ b/streams-contrib/streams-persist-hbase/src/main/resources/reference.properties
@@ -0,0 +1,10 @@
+hbase.rootdir = "hdfs://localhost:8020/hbase"
+
+zookeeper.znode.parent = "/hbase"
+
+zookeeper.znode.rootserver = "localhost"
+
+hbase.zookeeper.quorum = "localhost"
+
+hbase.zookeeper.property.clientPort = 2181
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
new file mode 100644
index 0000000..3c0aa8b
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -0,0 +1,105 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.stream.Interaction;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftEventProcessor implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftEventProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<Interaction> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private DatasiftActivitySerializer datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public DatasiftEventProcessor(Queue<Interaction> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public DatasiftEventProcessor(Queue<Interaction> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.outClass = outClass;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            Object item;
+            try {
+                item = inQueue.poll();
+                if(item instanceof String && item.equals(TERMINATE)) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                Thread.sleep(new Random().nextInt(100));
+
+                org.apache.streams.datasift.Datasift datasift = mapper.convertValue(item, Datasift.class);
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    outQueue.offer(new StreamsDatum(datasift.toString()));
+
+                }
+                else if( Interaction.class.equals(outClass))
+                {
+                    outQueue.offer(new StreamsDatum(item));
+                }
+                else if( Tweet.class.equals(outClass))
+                {
+                    // convert to desired format
+                    Twitter twitter = datasift.getTwitter();
+
+                    Tweet tweet = mapper.convertValue(twitter, Tweet.class);
+
+                    if( tweet != null ) {
+
+                        outQueue.offer(new StreamsDatum(tweet));
+
+                    }
+                }
+                else if( Activity.class.equals(outClass))
+                {
+                    // convert to desired format
+                    Interaction entry = (Interaction) item;
+                    if( entry != null ) {
+                        Activity out = datasiftInteractionActivitySerializer.deserialize(datasift);
+
+                        if( out != null )
+                            outQueue.offer(new StreamsDatum(out));
+                    }
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
new file mode 100644
index 0000000..71c4633
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
@@ -0,0 +1,26 @@
+package org.apache.streams.datasift.provider;
+
+import com.typesafe.config.Config;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftStreamConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamConfigurator.class);
+
+    public static DatasiftConfiguration detectConfiguration(Config datasift) {
+
+        DatasiftConfiguration datasiftConfiguration = new DatasiftConfiguration();
+
+        datasiftConfiguration.setApiKey(datasift.getString("apiKey"));
+        datasiftConfiguration.setUserName(datasift.getString("userName"));
+        datasiftConfiguration.setStreamHash(datasift.getStringList("hashes"));
+
+        return datasiftConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
new file mode 100644
index 0000000..d339385
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -0,0 +1,201 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.DataSiftConfig;
+import com.datasift.client.core.Stream;
+import com.datasift.client.stream.*;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftStreamProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
+
+    protected DatasiftConfiguration config = null;
+
+    protected DataSiftClient client;
+
+    private Class klass;
+
+    public DatasiftConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(DatasiftConfiguration config) {
+        this.config = config;
+    }
+
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(10000);
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    public BlockingQueue<Object> getInQueue() {
+        return inQueue;
+    }
+
+    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(100, 100));
+
+    protected List<String> streamHashes;
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public DatasiftStreamProvider() {
+        Config datasiftConfig = StreamsConfigurator.config.getConfig("datasift");
+        this.config = DatasiftStreamConfigurator.detectConfiguration(datasiftConfig);
+    }
+
+    public DatasiftStreamProvider(DatasiftConfiguration config) {
+        this.config = config;
+    }
+
+    public DatasiftStreamProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("datasift");
+        this.config = DatasiftStreamConfigurator.detectConfiguration(config);
+        this.klass = klass;
+    }
+
+    public DatasiftStreamProvider(DatasiftConfiguration config, Class klass) {
+        this.config = config;
+        this.klass = klass;
+    }
+
+    @Override
+    public void startStream() {
+
+        Preconditions.checkNotNull(this.klass);
+
+        Preconditions.checkNotNull(config);
+
+        Preconditions.checkNotNull(config.getStreamHash());
+
+        Preconditions.checkNotNull(config.getStreamHash().get(0));
+
+        for( String hash : config.getStreamHash()) {
+
+            client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
+
+        }
+
+        for( int i = 0; i < ((config.getStreamHash().size() / 5) + 1); i++ )
+            executor.submit(new DatasiftEventProcessor(inQueue, providerQueue, klass));
+
+    }
+
+    public void stop() {
+
+        for( String hash : config.getStreamHash()) {
+
+            client.liveStream().subscribe(new Subscription(Stream.fromString(hash)));
+
+        }
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        return (StreamsResultSet) providerQueue;
+
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(config);
+
+        String apiKey = config.getApiKey();
+        String userName = config.getUserName();
+
+        DataSiftConfig config = new DataSiftConfig(userName, apiKey);
+
+        client = new DataSiftClient(config);
+
+        client.liveStream().onError(new ErrorHandler());
+
+        //handle delete message
+        client.liveStream().onStreamEvent(new DeleteHandler());
+
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    public class Subscription extends StreamSubscription {
+        AtomicLong count = new AtomicLong();
+
+        public Subscription(Stream stream) {
+            super(stream);
+        }
+
+        public void onDataSiftLogMessage(DataSiftMessage di) {
+            //di.isWarning() is also available
+            System.out.println((di.isError() ? "Error" : di.isInfo() ? "Info" : "Warning") + ":\n" + di);
+        }
+
+        public void onMessage(Interaction i) {
+
+            LOGGER.debug("Processing:\n" + i);
+
+            inQueue.offer(i);
+
+            if (count.incrementAndGet() % 1000 == 0) {
+                LOGGER.info("Processed {}:\n " + count.get());
+
+            }
+
+        }
+    }
+
+    public class DeleteHandler extends StreamEventListener {
+        public void onDelete(DeletedInteraction di) {
+            //go off and delete the interaction if you have it stored. This is a strict requirement!
+            LOGGER.info("DELETED:\n " + di);
+        }
+    }
+
+    public class ErrorHandler extends ErrorListener {
+        public void exceptionCaught(Throwable t) {
+            LOGGER.warn(t.getMessage());
+            //do something useful...
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89ff615e/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
new file mode 100644
index 0000000..c73abd7
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -0,0 +1,181 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
+
+    public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) {
+
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+        mapper.setAnnotationIntrospector(introspector);
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+        try {
+
+            Activity activity = convert(serialized);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> datasifts) {
+        List<Activity> activities = Lists.newArrayList();
+        for( Datasift datasift : datasifts ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Date parse(String str) {
+        Date date;
+        String dstr;
+        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
+        DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+        try {
+            date = fmt.parse(str);
+            dstr = out.format(date);
+            return out.parse(dstr);
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Invalid date format", e);
+        }
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        return null;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:twitter");
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb("post");
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(parse(event.getInteraction().getCreatedAt()));
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event.getInteraction()));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    public static Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        actor.setId(formatId(interaction.getAuthor().getId().toString()));
+        actor.setDisplayName(interaction.getAuthor().getUsername());
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        return actObj;
+    }
+
+    public static List<Object> getLinks(Interaction interaction) {
+        List<Object> links = Lists.newArrayList();
+        return links;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+}