You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:22 UTC

[07/14] git commit: adding push provider, reverting change to streams provider

adding push provider, reverting change to streams provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f272ff53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f272ff53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f272ff53

Branch: refs/heads/master
Commit: f272ff5347f0a00b24402960b251c143e966f34f
Parents: eda5a3c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Jul 27 19:13:25 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 154 +++++++++++++++++++
 .../com/datasift/DatasiftPushConfiguration.json |  17 ++
 .../datasift/DatasiftStreamConfiguration.json   |  17 ++
 3 files changed, 188 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
new file mode 100644
index 0000000..196f504
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -0,0 +1,154 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.stream.DeletedInteraction;
+import com.datasift.client.stream.Interaction;
+import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Requires Java Version 1.7!
+ * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
+ * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ */
+public class DatasiftPushProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
+
+    private DatasiftConfiguration config;
+    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
+    private Map<String, DataSiftClient> clients;
+    private StreamEventListener eventListener;
+    private ObjectMapper mapper;
+
+    public DatasiftPushProvider() {
+
+    }
+
+    // to set up a webhook we need to be able to return a reference to this queue
+    public Queue<Interaction> getInteractions() {
+        return interactions;
+    }
+
+    @Override
+    public void startStream() {
+
+        Preconditions.checkNotNull(this.config);
+        Preconditions.checkNotNull(this.config.getApiKey());
+        Preconditions.checkNotNull(this.config.getUserName());
+
+    }
+
+    /**
+     * Shuts down all open streams from datasift.
+     */
+    public void stop() {
+    }
+
+    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
+    @Override
+    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+        StreamsDatum datum = null;
+        Interaction interaction;
+        while (!this.interactions.isEmpty()) {
+            interaction = this.interactions.poll();
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
+            } catch (JsonProcessingException jpe) {
+                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
+            }
+            if (datum != null) {
+                while (!datums.offer(datum)) {
+                    Thread.yield();
+                }
+            }
+
+        }
+        return new StreamsResultSet(datums);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.clients != null && this.clients.size() > 0;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.interactions = new ConcurrentLinkedQueue<Interaction>();
+        this.clients = Maps.newHashMap();
+        this.mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    public DatasiftConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(DatasiftConfiguration config) {
+        this.config = config;
+    }
+
+
+    /**
+     * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
+     * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
+     */
+    public static class DeleteHandler extends StreamEventListener {
+
+        public void onDelete(DeletedInteraction di) {
+            //go off and delete the interaction if you have it stored. This is a strict requirement!
+            LOGGER.info("DELETED:\n " + di);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
new file mode 100644
index 0000000..bb65ef0
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftPushConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
new file mode 100644
index 0000000..91a9974
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftStreamConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file