You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:23 UTC

[08/14] git commit: simplify/isolate push provider

simplify/isolate push provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e66e51a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e66e51a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e66e51a

Branch: refs/heads/master
Commit: 2e66e51a9fbef1e1fb7ae922a37d94d38dcf254a
Parents: f272ff5
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 61 +++++++-------------
 1 file changed, 20 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e66e51a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 196f504..264dbbe 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -18,28 +18,22 @@ under the License.
 */
 package org.apache.streams.datasift.provider;
 
-import com.datasift.client.DataSiftClient;
 import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.Interaction;
 import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Requires Java Version 1.7!
@@ -51,27 +45,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
     private DatasiftConfiguration config;
-    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
-    private Map<String, DataSiftClient> clients;
-    private StreamEventListener eventListener;
-    private ObjectMapper mapper;
+    protected Queue<StreamsDatum> providerQueue;
 
-    public DatasiftPushProvider() {
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    }
+    public DatasiftPushProvider() {
 
-    // to set up a webhook we need to be able to return a reference to this queue
-    public Queue<Interaction> getInteractions() {
-        return interactions;
     }
 
     @Override
     public void startStream() {
-
-        Preconditions.checkNotNull(this.config);
-        Preconditions.checkNotNull(this.config.getApiKey());
-        Preconditions.checkNotNull(this.config.getUserName());
-
+        Preconditions.checkNotNull(providerQueue);
     }
 
     /**
@@ -85,23 +69,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
         Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
-        StreamsDatum datum = null;
-        Interaction interaction;
-        while (!this.interactions.isEmpty()) {
-            interaction = this.interactions.poll();
-            try {
-                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
-            } catch (JsonProcessingException jpe) {
-                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
-            }
-            if (datum != null) {
-                while (!datums.offer(datum)) {
-                    Thread.yield();
-                }
-            }
 
+        StreamsResultSet current = new StreamsResultSet(datums);
+        try {
+            lock.writeLock().lock();
+            current = new StreamsResultSet(providerQueue);
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
         }
-        return new StreamsResultSet(datums);
+
+        return current;
     }
 
     @Override
@@ -115,14 +93,12 @@ public class DatasiftPushProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return this.clients != null && this.clients.size() > 0;
+        return true;
     }
 
     @Override
     public void prepare(Object configurationObject) {
-        this.interactions = new ConcurrentLinkedQueue<Interaction>();
-        this.clients = Maps.newHashMap();
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.providerQueue = constructQueue();
     }
 
     @Override
@@ -138,6 +114,9 @@ public class DatasiftPushProvider implements StreamsProvider {
         this.config = config;
     }
 
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.newConcurrentLinkedQueue();
+    }
 
     /**
      * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST