You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:22 UTC
[07/14] git commit: adding push provider,
reverting change to streams provider
adding push provider, reverting change to streams provider
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f272ff53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f272ff53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f272ff53
Branch: refs/heads/master
Commit: f272ff5347f0a00b24402960b251c143e966f34f
Parents: eda5a3c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Jul 27 19:13:25 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500
----------------------------------------------------------------------
.../datasift/provider/DatasiftPushProvider.java | 154 +++++++++++++++++++
.../com/datasift/DatasiftPushConfiguration.json | 17 ++
.../datasift/DatasiftStreamConfiguration.json | 17 ++
3 files changed, 188 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
new file mode 100644
index 0000000..196f504
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -0,0 +1,154 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.stream.DeletedInteraction;
+import com.datasift.client.stream.Interaction;
+import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Requires Java Version 1.7!
+ * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface. The provider
+ * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ */
+public class DatasiftPushProvider implements StreamsProvider {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
+
+ private DatasiftConfiguration config;
+ protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
+ private Map<String, DataSiftClient> clients;
+ private StreamEventListener eventListener;
+ private ObjectMapper mapper;
+
+ public DatasiftPushProvider() {
+
+ }
+
+ // to set up a webhook we need to be able to return a reference to this queue
+ public Queue<Interaction> getInteractions() {
+ return interactions;
+ }
+
+ @Override
+ public void startStream() {
+
+ Preconditions.checkNotNull(this.config);
+ Preconditions.checkNotNull(this.config.getApiKey());
+ Preconditions.checkNotNull(this.config.getUserName());
+
+ }
+
+ /**
+ * Shuts down all open streams from datasift.
+ */
+ public void stop() {
+ }
+
+ // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
+ @Override
+ //This is a hack. It is only like this because of how perpetual streams work at the moment. Read list server to debate/vote for new interfaces.
+ public StreamsResultSet readCurrent() {
+ Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+ StreamsDatum datum = null;
+ Interaction interaction;
+ while (!this.interactions.isEmpty()) {
+ interaction = this.interactions.poll();
+ try {
+ datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
+ } catch (JsonProcessingException jpe) {
+ LOGGER.error("Exception while converting Interaction to String : {}", jpe);
+ }
+ if (datum != null) {
+ while (!datums.offer(datum)) {
+ Thread.yield();
+ }
+ }
+
+ }
+ return new StreamsResultSet(datums);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.clients != null && this.clients.size() > 0;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.interactions = new ConcurrentLinkedQueue<Interaction>();
+ this.clients = Maps.newHashMap();
+ this.mapper = StreamsJacksonMapper.getInstance();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ public DatasiftConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(DatasiftConfiguration config) {
+ this.config = config;
+ }
+
+
+ /**
+ * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
+ * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
+ */
+ public static class DeleteHandler extends StreamEventListener {
+
+ public void onDelete(DeletedInteraction di) {
+ //go off and delete the interaction if you have it stored. This is a strict requirement!
+ LOGGER.info("DELETED:\n " + di);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
new file mode 100644
index 0000000..bb65ef0
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
@@ -0,0 +1,17 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.datasift.DatasiftPushConfiguration",
+ "extends": {"$ref":"DatasiftConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "streamHash": {
+ "type": "array",
+ "minItems": 1,
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
new file mode 100644
index 0000000..91a9974
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
@@ -0,0 +1,17 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.datasift.DatasiftStreamConfiguration",
+ "extends": {"$ref":"DatasiftConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "streamHash": {
+ "type": "array",
+ "minItems": 1,
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file