You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:11 UTC
[4/9] incubator-streams git commit: refactored DatasiftProvider to
Resource
refactored DatasiftProvider to Resource
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbb69526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbb69526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbb69526
Branch: refs/heads/STREAMS-222
Commit: dbb69526e7469f6b988e9ed35793616d988102f6
Parents: dbda9ed
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:30:15 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:30:15 2014 -0600
----------------------------------------------------------------------
.../datasift/provider/DatasiftPushProvider.java | 196 ++++++++++++++-----
.../streams/datasift/DatasiftWebhookData.json | 30 +++
2 files changed, 180 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 264dbbe..2ab949f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -20,66 +20,193 @@ package org.apache.streams.datasift.provider;
import com.datasift.client.stream.DeletedInteraction;
import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.datasift.DatasiftWebhookData;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.math.BigInteger;
+import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
/**
* Requires Java Version 1.7!
* {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface. The provider
* uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
*/
+@Resource
+@Path("/streams/webhooks/datasift")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
public class DatasiftPushProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
- private DatasiftConfiguration config;
- protected Queue<StreamsDatum> providerQueue;
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public DatasiftPushProvider() {
+ private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+ @POST
+ @Path("json")
+ public Response json(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
}
- @Override
- public void startStream() {
- Preconditions.checkNotNull(providerQueue);
+ @POST
+ @Path("json_new_line")
+ public Response json_new_line(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ for( String item : Splitter.on(newLinePattern).split(body)) {
+ StreamsDatum datum = new StreamsDatum(item);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ LOGGER.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+
+ return Response.status(500).entity(response).build();
+
+ }
+
}
- /**
- * Shuts down all open streams from datasift.
- */
- public void stop() {
+ @POST
+ @Path("json_meta")
+ public Response json_meta(@Context HttpHeaders headers,
+ String body) {
+
+ //log.debug(headers.toString(), headers);
+
+ //log.debug(body.toString(), body);
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class);
+
+ for( Datasift item : objectWrapper.getInteractions()) {
+
+ String json = mapper.writeValueAsString(item);
+
+ StreamsDatum datum = new StreamsDatum(json);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ LOGGER.warn(e.toString(), e);
+ }
+
+ return Response.status(500).build();
+ }
+
+ @Override
+ public void startStream() {
+ return;
}
- // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
@Override
- //This is a hack. It is only like this because of how perpetual streams work at the moment. Read list server to debate/vote for new interfaces.
public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
- StreamsResultSet current = new StreamsResultSet(datums);
- try {
- lock.writeLock().lock();
- current = new StreamsResultSet(providerQueue);
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
+ StreamsResultSet current;
+
+ lock.writeLock().lock();
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ providerQueue.clear();
+ lock.writeLock().unlock();
return current;
+
}
@Override
@@ -87,6 +214,7 @@ public class DatasiftPushProvider implements StreamsProvider {
return null;
}
+ @Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
return null;
}
@@ -98,36 +226,12 @@ public class DatasiftPushProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
- this.providerQueue = constructQueue();
+
}
@Override
public void cleanUp() {
- stop();
- }
- public DatasiftConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(DatasiftConfiguration config) {
- this.config = config;
- }
-
- private Queue<StreamsDatum> constructQueue() {
- return Queues.newConcurrentLinkedQueue();
- }
-
- /**
- * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
- * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
- */
- public static class DeleteHandler extends StreamEventListener {
-
- public void onDelete(DeletedInteraction di) {
- //go off and delete the interaction if you have it stored. This is a strict requirement!
- LOGGER.info("DELETED:\n " + di);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
new file mode 100644
index 0000000..c4e43e8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "javaType": "org.apache.streams.datasift.DatasiftWebhookData",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "hash": {
+ "type": "string"
+ },
+ "hash_type": {
+ "type": "string"
+ },
+ "count": {
+ "type": "long"
+ },
+ "delivered_at": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "interactions": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "javaType": "org.apache.streams.datasift.Datasift"
+ }
+ }
+ }
+}
\ No newline at end of file