You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:23 UTC
[08/14] git commit: simplify/isolate push provider
simplify/isolate push provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e66e51a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e66e51a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e66e51a
Branch: refs/heads/master
Commit: 2e66e51a9fbef1e1fb7ae922a37d94d38dcf254a
Parents: f272ff5
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500
----------------------------------------------------------------------
.../datasift/provider/DatasiftPushProvider.java | 61 +++++++-------------
1 file changed, 20 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e66e51a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 196f504..264dbbe 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -18,28 +18,22 @@ under the License.
*/
package org.apache.streams.datasift.provider;
-import com.datasift.client.DataSiftClient;
import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.Interaction;
import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
-import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Requires Java Version 1.7!
@@ -51,27 +45,17 @@ public class DatasiftPushProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
private DatasiftConfiguration config;
- protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
- private Map<String, DataSiftClient> clients;
- private StreamEventListener eventListener;
- private ObjectMapper mapper;
+ protected Queue<StreamsDatum> providerQueue;
- public DatasiftPushProvider() {
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- }
+ public DatasiftPushProvider() {
- // to set up a webhook we need to be able to return a reference to this queue
- public Queue<Interaction> getInteractions() {
- return interactions;
}
@Override
public void startStream() {
-
- Preconditions.checkNotNull(this.config);
- Preconditions.checkNotNull(this.config.getApiKey());
- Preconditions.checkNotNull(this.config.getUserName());
-
+ Preconditions.checkNotNull(providerQueue);
}
/**
@@ -85,23 +69,17 @@ public class DatasiftPushProvider implements StreamsProvider {
//This is a hack. It is only like this because of how perpetual streams work at the moment. Read list server to debate/vote for new interfaces.
public StreamsResultSet readCurrent() {
Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
- StreamsDatum datum = null;
- Interaction interaction;
- while (!this.interactions.isEmpty()) {
- interaction = this.interactions.poll();
- try {
- datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
- } catch (JsonProcessingException jpe) {
- LOGGER.error("Exception while converting Interaction to String : {}", jpe);
- }
- if (datum != null) {
- while (!datums.offer(datum)) {
- Thread.yield();
- }
- }
+ StreamsResultSet current = new StreamsResultSet(datums);
+ try {
+ lock.writeLock().lock();
+ current = new StreamsResultSet(providerQueue);
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
}
- return new StreamsResultSet(datums);
+
+ return current;
}
@Override
@@ -115,14 +93,12 @@ public class DatasiftPushProvider implements StreamsProvider {
@Override
public boolean isRunning() {
- return this.clients != null && this.clients.size() > 0;
+ return true;
}
@Override
public void prepare(Object configurationObject) {
- this.interactions = new ConcurrentLinkedQueue<Interaction>();
- this.clients = Maps.newHashMap();
- this.mapper = StreamsJacksonMapper.getInstance();
+ this.providerQueue = constructQueue();
}
@Override
@@ -138,6 +114,9 @@ public class DatasiftPushProvider implements StreamsProvider {
this.config = config;
}
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.newConcurrentLinkedQueue();
+ }
/**
* THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST