You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:11 UTC

[4/9] incubator-streams git commit: refactored DatasiftProvider to Resource

refactored DatasiftProvider to Resource


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbb69526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbb69526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbb69526

Branch: refs/heads/STREAMS-222
Commit: dbb69526e7469f6b988e9ed35793616d988102f6
Parents: dbda9ed
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:30:15 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:30:15 2014 -0600

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 196 ++++++++++++++-----
 .../streams/datasift/DatasiftWebhookData.json   |  30 +++
 2 files changed, 180 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 264dbbe..2ab949f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -20,66 +20,193 @@ package org.apache.streams.datasift.provider;
 
 import com.datasift.client.stream.DeletedInteraction;
 import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.datasift.DatasiftWebhookData;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.math.BigInteger;
+import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 /**
  * Requires Java Version 1.7!
  * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
  * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
  */
+@Resource
+@Path("/streams/webhooks/datasift")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
 public class DatasiftPushProvider implements StreamsProvider {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
-    private DatasiftConfiguration config;
-    protected Queue<StreamsDatum> providerQueue;
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
 
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public DatasiftPushProvider() {
+    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+    @POST
+    @Path("json")
+    public Response json(@Context HttpHeaders headers,
+                         String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        StreamsDatum datum = new StreamsDatum(body);
+
+        lock.writeLock().lock();
+        ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        lock.writeLock().unlock();
+
+        Boolean success = true;
+
+        response.put("success", success);
+
+        return Response.status(200).entity(response).build();
 
     }
 
-    @Override
-    public void startStream() {
-        Preconditions.checkNotNull(providerQueue);
+    @POST
+    @Path("json_new_line")
+    public Response json_new_line(@Context HttpHeaders headers,
+                                  String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            for( String item : Splitter.on(newLinePattern).split(body)) {
+                StreamsDatum datum = new StreamsDatum(item);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            LOGGER.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+
+            return Response.status(500).entity(response).build();
+
+        }
+
     }
 
-    /**
-     * Shuts down all open streams from datasift.
-     */
-    public void stop() {
+    @POST
+    @Path("json_meta")
+    public Response json_meta(@Context HttpHeaders headers,
+                              String body) {
+
+        //log.debug(headers.toString(), headers);
+
+        //log.debug(body.toString(), body);
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class);
+
+            for( Datasift item : objectWrapper.getInteractions()) {
+
+                String json = mapper.writeValueAsString(item);
+
+                StreamsDatum datum = new StreamsDatum(json);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            LOGGER.warn(e.toString(), e);
+        }
+
+        return Response.status(500).build();
+    }
+
+    @Override
+    public void startStream() {
+        return;
     }
 
-    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
     @Override
-    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
 
-        StreamsResultSet current = new StreamsResultSet(datums);
-        try {
-            lock.writeLock().lock();
-            current = new StreamsResultSet(providerQueue);
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        StreamsResultSet current;
+
+        lock.writeLock().lock();
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+        providerQueue.clear();
+        lock.writeLock().unlock();
 
         return current;
+
     }
 
     @Override
@@ -87,6 +214,7 @@ public class DatasiftPushProvider implements StreamsProvider {
         return null;
     }
 
+    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }
@@ -98,36 +226,12 @@ public class DatasiftPushProvider implements StreamsProvider {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.providerQueue = constructQueue();
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
-    }
 
-    public DatasiftConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(DatasiftConfiguration config) {
-        this.config = config;
-    }
-
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.newConcurrentLinkedQueue();
-    }
-
-    /**
-     * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
-     * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
-     */
-    public static class DeleteHandler extends StreamEventListener {
-
-        public void onDelete(DeletedInteraction di) {
-            //go off and delete the interaction if you have it stored. This is a strict requirement!
-            LOGGER.info("DELETED:\n " + di);
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
new file mode 100644
index 0000000..c4e43e8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
@@ -0,0 +1,30 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "javaType": "org.apache.streams.datasift.DatasiftWebhookData",
+    "properties": {
+        "id": {
+            "type": "string"
+        },
+        "hash": {
+            "type": "string"
+        },
+        "hash_type": {
+            "type": "string"
+        },
+        "count": {
+            "type": "long"
+        },
+        "delivered_at": {
+            "type": "string",
+            "format": "date-time"
+        },
+        "interactions": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "org.apache.streams.datasift.Datasift"
+            }
+        }
+    }
+}
\ No newline at end of file