You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 22:10:16 UTC

[01/14] git commit: adding push provider, reverting change to streams provider

Repository: incubator-streams
Updated Branches:
  refs/heads/master 41a80ed2e -> d309a5987


adding push provider, reverting change to streams provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d9e0a384
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d9e0a384
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d9e0a384

Branch: refs/heads/master
Commit: d9e0a384cda3820d53d40d47329fd65fb09c1282
Parents: f9d484b
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Jul 27 19:13:25 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Jul 27 19:14:12 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 154 +++++++++++++++++++
 .../com/datasift/DatasiftPushConfiguration.json |  17 ++
 .../datasift/DatasiftStreamConfiguration.json   |  17 ++
 3 files changed, 188 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e0a384/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
new file mode 100644
index 0000000..196f504
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -0,0 +1,154 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.stream.DeletedInteraction;
+import com.datasift.client.stream.Interaction;
+import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Requires Java Version 1.7!
+ * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
+ * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ */
+public class DatasiftPushProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
+
+    private DatasiftConfiguration config;
+    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
+    private Map<String, DataSiftClient> clients;
+    private StreamEventListener eventListener;
+    private ObjectMapper mapper;
+
+    public DatasiftPushProvider() {
+
+    }
+
+    // to set up a webhook we need to be able to return a reference to this queue
+    public Queue<Interaction> getInteractions() {
+        return interactions;
+    }
+
+    @Override
+    public void startStream() {
+
+        Preconditions.checkNotNull(this.config);
+        Preconditions.checkNotNull(this.config.getApiKey());
+        Preconditions.checkNotNull(this.config.getUserName());
+
+    }
+
+    /**
+     * Shuts down all open streams from datasift.
+     */
+    public void stop() {
+    }
+
+    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
+    @Override
+    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+        StreamsDatum datum = null;
+        Interaction interaction;
+        while (!this.interactions.isEmpty()) {
+            interaction = this.interactions.poll();
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
+            } catch (JsonProcessingException jpe) {
+                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
+            }
+            if (datum != null) {
+                while (!datums.offer(datum)) {
+                    Thread.yield();
+                }
+            }
+
+        }
+        return new StreamsResultSet(datums);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.clients != null && this.clients.size() > 0;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.interactions = new ConcurrentLinkedQueue<Interaction>();
+        this.clients = Maps.newHashMap();
+        this.mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    public DatasiftConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(DatasiftConfiguration config) {
+        this.config = config;
+    }
+
+
+    /**
+     * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
+     * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
+     */
+    public static class DeleteHandler extends StreamEventListener {
+
+        public void onDelete(DeletedInteraction di) {
+            //go off and delete the interaction if you have it stored. This is a strict requirement!
+            LOGGER.info("DELETED:\n " + di);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e0a384/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
new file mode 100644
index 0000000..bb65ef0
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftPushConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d9e0a384/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
new file mode 100644
index 0000000..91a9974
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftStreamConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file


[12/14] git commit: created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string changed all classes in package except configurator to use this Mapper

Posted by sb...@apache.org.
created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string
changed all classes in package except configurator to use this Mapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3af77bd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3af77bd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3af77bd1

Branch: refs/heads/master
Commit: 3af77bd1ff83e4c882eb90ea899a02661fdbd2ee
Parents: fec8a37
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:24:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../provider/DatasiftStreamProvider.java        |  4 +-
 .../DatasiftTypeConverterProcessor.java         |  7 +-
 .../serializer/DatasiftActivitySerializer.java  |  4 +-
 .../DatasiftDefaultActivitySerializer.java      | 21 ++---
 .../DatasiftTweetActivitySerializer.java        | 14 ++--
 .../datasift/util/StreamsDatasiftMapper.java    | 84 ++++++++++++++++++++
 .../DatasiftActivitySerializerTest.java         | 31 +++++++-
 7 files changed, 135 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 09c01b0..8ed1443 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -35,7 +35,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,7 +202,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
     public void prepare(Object configurationObject) {
         this.interactions = new ConcurrentLinkedQueue<Interaction>();
         this.clients = Maps.newHashMap();
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper = StreamsDatasiftMapper.getInstance();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
index 203e5e8..0b847a4 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -21,10 +21,11 @@ package org.apache.streams.datasift.provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.*;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper = StreamsDatasiftMapper.getInstance();
         this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
         if(this.outClass.equals(Activity.class)) {
             this.converter = new ActivityConverter();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 1d5fd2c..7d644f0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -21,8 +21,8 @@ package org.apache.streams.datasift.serializer;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.List;
@@ -34,7 +34,7 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
 
     private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
     private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Override
     public String serializationFormat() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index 4095df6..b70aa12 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -1,15 +1,13 @@
 package org.apache.streams.datasift.serializer;
 
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.*;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.interaction.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +28,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
 
     public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
 
     @Override
     public String serializationFormat() {
@@ -46,7 +44,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         try {
             return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
         } catch (Exception e) {
-            LOGGER.error("Excpetion while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
             LOGGER.error("Exception : {}", e);
             throw new RuntimeException(e);
         }
@@ -55,13 +53,6 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
     @Override
     public Activity deserialize(Datasift serialized) {
 
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
         try {
 
             Activity activity = convert(serialized);
@@ -141,7 +132,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         Actor actor = new Actor();
         org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
         if(author == null) {
-            LOGGER.warn("Interactiond does not contain author information.");
+            LOGGER.warn("Interaction does not contain author information.");
             return actor;
         }
         String userName = author.getUsername();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index e7b0a52..b16aae2 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -22,8 +22,8 @@ package org.apache.streams.datasift.serializer;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Author;
 import org.apache.streams.datasift.interaction.Interaction;
 import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
 import org.apache.streams.datasift.twitter.Retweet;
@@ -134,7 +134,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
                 .orNull()));
         actor.setSummary(user.getDescription());
         try {
-            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
+            actor.setPublished(user.getCreatedAt());
         } catch (Exception e) {
             LOGGER.warn("Exception trying to parse date : {}", e);
         }
@@ -149,14 +149,16 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         extensions.put("followers", user.getFollowersCount());
         extensions.put("screenName", user.getScreenName());
         if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
+            extensions.put("favorites", user.getFavouritesCount());
         }
 
         Image profileImage = new Image();
         String profileUrl = null;
-        profileUrl = event.getInteraction().getAuthor().getAvatar();
-        if(profileUrl == null && user.getAdditionalProperties() != null) {
-            Object url = user.getAdditionalProperties().get("profile_image_url_https");
+        Author author = event.getInteraction().getAuthor();
+        if( author != null )
+            profileUrl = author.getAvatar();
+        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
+            Object url = user.getProfileImageUrlHttps();
             if(url instanceof String)
                 profileUrl = (String) url;
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
new file mode 100644
index 0000000..c5f2abf
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsDatasiftMapper extends StreamsJacksonMapper {
+
+    public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+
+    public static final Long getMillis(String dateTime) {
+
+        // this function is for pig which doesn't handle exceptions well
+        try {
+            Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+            return result;
+        } catch( Exception e ) {
+            return null;
+        }
+
+    }
+
+    private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper();
+
+    public static StreamsDatasiftMapper getInstance(){
+        return INSTANCE;
+    }
+
+    public StreamsDatasiftMapper() {
+        super();
+        registerModule(new SimpleModule()
+        {
+            {
+                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+                    @Override
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                        DateTime result = null;
+                        try {
+                            result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+                        } catch (Exception e) {}
+                        if (result == null) {
+                            try {
+                                result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+                            } catch (Exception e) {}
+                        }
+                        return result;
+                    }
+                });
+            }
+        });
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 88dd2d6..90b7285 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,7 +2,8 @@ package org.apache.streams.datasift.serializer;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.junit.Test;
@@ -11,11 +12,12 @@ import java.util.Scanner;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 public class DatasiftActivitySerializerTest {
 
     private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Test
     public void testGeneralConversion() throws Exception {
@@ -39,6 +41,8 @@ public class DatasiftActivitySerializerTest {
         while(scanner.hasNextLine()) {
             line = scanner.nextLine();
             testGeneralConversion(line);
+            testDeserNoNull(line);
+            testDeserNoAddProps(line);
             System.out.println("ORIGINAL -> "+line);
             System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
             System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
@@ -59,8 +63,31 @@ public class DatasiftActivitySerializerTest {
         assertNotNull(json, activity.getUrl());
         Actor actor = activity.getActor();
         assertNotNull(json, actor);
+
     }
 
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    private void testDeserNoNull(String json) throws Exception {
+        Activity ser = SERIALIZER.deserialize(json);
+        String deser = MAPPER.writeValueAsString(ser);
+        int nulls = StringUtils.countMatches(deser, ":null");
+        assertEquals(0l, (long)nulls);
+
+    }
 
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    private void testDeserNoAddProps(String json) throws Exception {
+        Activity ser = SERIALIZER.deserialize(json);
+        String deser = MAPPER.writeValueAsString(ser);
+        int nulls = StringUtils.countMatches(deser, "additionalProperties:{");
+        assertEquals(0l, (long)nulls);
+
+    }
 
 }


[08/14] git commit: simplify/isolate push provider

Posted by sb...@apache.org.
simplify/isolate push provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e66e51a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e66e51a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e66e51a

Branch: refs/heads/master
Commit: 2e66e51a9fbef1e1fb7ae922a37d94d38dcf254a
Parents: f272ff5
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 61 +++++++-------------
 1 file changed, 20 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e66e51a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 196f504..264dbbe 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -18,28 +18,22 @@ under the License.
 */
 package org.apache.streams.datasift.provider;
 
-import com.datasift.client.DataSiftClient;
 import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.Interaction;
 import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Requires Java Version 1.7!
@@ -51,27 +45,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
     private DatasiftConfiguration config;
-    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
-    private Map<String, DataSiftClient> clients;
-    private StreamEventListener eventListener;
-    private ObjectMapper mapper;
+    protected Queue<StreamsDatum> providerQueue;
 
-    public DatasiftPushProvider() {
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    }
+    public DatasiftPushProvider() {
 
-    // to set up a webhook we need to be able to return a reference to this queue
-    public Queue<Interaction> getInteractions() {
-        return interactions;
     }
 
     @Override
     public void startStream() {
-
-        Preconditions.checkNotNull(this.config);
-        Preconditions.checkNotNull(this.config.getApiKey());
-        Preconditions.checkNotNull(this.config.getUserName());
-
+        Preconditions.checkNotNull(providerQueue);
     }
 
     /**
@@ -85,23 +69,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
         Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
-        StreamsDatum datum = null;
-        Interaction interaction;
-        while (!this.interactions.isEmpty()) {
-            interaction = this.interactions.poll();
-            try {
-                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
-            } catch (JsonProcessingException jpe) {
-                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
-            }
-            if (datum != null) {
-                while (!datums.offer(datum)) {
-                    Thread.yield();
-                }
-            }
 
+        StreamsResultSet current = new StreamsResultSet(datums);
+        try {
+            lock.writeLock().lock();
+            current = new StreamsResultSet(providerQueue);
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
         }
-        return new StreamsResultSet(datums);
+
+        return current;
     }
 
     @Override
@@ -115,14 +93,12 @@ public class DatasiftPushProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return this.clients != null && this.clients.size() > 0;
+        return true;
     }
 
     @Override
     public void prepare(Object configurationObject) {
-        this.interactions = new ConcurrentLinkedQueue<Interaction>();
-        this.clients = Maps.newHashMap();
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.providerQueue = constructQueue();
     }
 
     @Override
@@ -138,6 +114,9 @@ public class DatasiftPushProvider implements StreamsProvider {
         this.config = config;
     }
 
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.newConcurrentLinkedQueue();
+    }
 
     /**
      * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST


[02/14] git commit: additions to enable streams-api

Posted by sb...@apache.org.
additions to enable streams-api


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9d484b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9d484b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9d484b8

Branch: refs/heads/master
Commit: f9d484b8421640e501e84f1c056b079a1404446d
Parents: c454d52
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 16:59:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Jul 27 19:14:12 2014 -0500

----------------------------------------------------------------------
 pom.xml                                                   |  2 +-
 streams-contrib/streams-provider-datasift/pom.xml         |  2 +-
 .../datasift/provider/DatasiftStreamConfigurator.java     | 10 ++++------
 .../streams/datasift/provider/DatasiftStreamProvider.java |  8 ++++++--
 4 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9d484b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eebd5be..419ef89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <java.version>1.6</java.version>
+        <java.version>1.7</java.version>
         <org.osgi.service.http.port>8080</org.osgi.service.http.port>
         <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
         <jackson.version>2.3.2</jackson.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9d484b8/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 5c5f674..3b9f96f 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -83,7 +83,7 @@
 
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <testSourceDirectory>src/test/java17</testSourceDirectory>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9d484b8/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
index d85d6e5..7025d39 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
@@ -19,7 +19,9 @@ under the License.
 package org.apache.streams.datasift.provider;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,12 +34,8 @@ public class DatasiftStreamConfigurator {
 
     public static DatasiftConfiguration detectConfiguration(Config datasift) {
 
-        DatasiftConfiguration datasiftConfiguration = new DatasiftConfiguration();
-
-        datasiftConfiguration.setApiKey(datasift.getString("apiKey"));
-        datasiftConfiguration.setUserName(datasift.getString("userName"));
-        datasiftConfiguration.setStreamHash(datasift.getStringList("hashes"));
-
+        DatasiftConfiguration datasiftConfiguration;
+        datasiftConfiguration = StreamsJacksonMapper.getInstance().convertValue(datasift.root().render(ConfigRenderOptions.concise()), DatasiftConfiguration.class);
         return datasiftConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9d484b8/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 0dc4c92..09c01b0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -55,7 +55,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
 
     private DatasiftConfiguration config;
-    private ConcurrentLinkedQueue<Interaction> interactions;
+    private ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
     private Map<String, DataSiftClient> clients;
     private StreamEventListener eventListener;
     private ObjectMapper mapper;
@@ -69,6 +69,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
         this(listener, null);
     }
 
+    // to set up a webhook we need to be able to return a reference to this queue
+    public Queue<Interaction> getInteractions() {
+        return interactions;
+    }
+
     /**
      * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles deletion notices received from twitter.
      * @param config   Configuration to use
@@ -184,7 +189,6 @@ public class DatasiftStreamProvider implements StreamsProvider {
         return null;
     }
 
-    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }


[03/14] git commit: simplify/isolate push provider

Posted by sb...@apache.org.
simplify/isolate push provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9e774a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9e774a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9e774a8a

Branch: refs/heads/master
Commit: 9e774a8a37f27187766eb02255f4a900ae37b711
Parents: d9e0a38
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:22:22 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 61 +++++++-------------
 1 file changed, 20 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9e774a8a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 196f504..264dbbe 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -18,28 +18,22 @@ under the License.
 */
 package org.apache.streams.datasift.provider;
 
-import com.datasift.client.DataSiftClient;
 import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.Interaction;
 import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Requires Java Version 1.7!
@@ -51,27 +45,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
     private DatasiftConfiguration config;
-    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
-    private Map<String, DataSiftClient> clients;
-    private StreamEventListener eventListener;
-    private ObjectMapper mapper;
+    protected Queue<StreamsDatum> providerQueue;
 
-    public DatasiftPushProvider() {
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    }
+    public DatasiftPushProvider() {
 
-    // to set up a webhook we need to be able to return a reference to this queue
-    public Queue<Interaction> getInteractions() {
-        return interactions;
     }
 
     @Override
     public void startStream() {
-
-        Preconditions.checkNotNull(this.config);
-        Preconditions.checkNotNull(this.config.getApiKey());
-        Preconditions.checkNotNull(this.config.getUserName());
-
+        Preconditions.checkNotNull(providerQueue);
     }
 
     /**
@@ -85,23 +69,17 @@ public class DatasiftPushProvider implements StreamsProvider {
     //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
         Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
-        StreamsDatum datum = null;
-        Interaction interaction;
-        while (!this.interactions.isEmpty()) {
-            interaction = this.interactions.poll();
-            try {
-                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
-            } catch (JsonProcessingException jpe) {
-                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
-            }
-            if (datum != null) {
-                while (!datums.offer(datum)) {
-                    Thread.yield();
-                }
-            }
 
+        StreamsResultSet current = new StreamsResultSet(datums);
+        try {
+            lock.writeLock().lock();
+            current = new StreamsResultSet(providerQueue);
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
         }
-        return new StreamsResultSet(datums);
+
+        return current;
     }
 
     @Override
@@ -115,14 +93,12 @@ public class DatasiftPushProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return this.clients != null && this.clients.size() > 0;
+        return true;
     }
 
     @Override
     public void prepare(Object configurationObject) {
-        this.interactions = new ConcurrentLinkedQueue<Interaction>();
-        this.clients = Maps.newHashMap();
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.providerQueue = constructQueue();
     }
 
     @Override
@@ -138,6 +114,9 @@ public class DatasiftPushProvider implements StreamsProvider {
         this.config = config;
     }
 
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.newConcurrentLinkedQueue();
+    }
 
     /**
      * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST


[14/14] git commit: Merge branch 'STREAMS-138' of https://git-wip-us.apache.org/repos/asf/incubator-streams

Posted by sb...@apache.org.
Merge branch 'STREAMS-138' of https://git-wip-us.apache.org/repos/asf/incubator-streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d309a598
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d309a598
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d309a598

Branch: refs/heads/master
Commit: d309a5987d1f0d1c38142ec366835d58edfa8221
Parents: 41a80ed 5dec4aa
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Aug 8 15:07:08 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Aug 8 15:07:08 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../streams-provider-datasift/pom.xml           |   2 +-
 .../datasift/provider/DatasiftPushProvider.java | 133 +++++++++++++++++++
 .../provider/DatasiftStreamConfigurator.java    |  10 +-
 .../provider/DatasiftStreamProvider.java        |  12 +-
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |   4 +-
 .../DatasiftDefaultActivitySerializer.java      |  21 +--
 .../DatasiftTweetActivitySerializer.java        |  14 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  84 ++++++++++++
 .../main/jsonschema/com/datasift/Datasift.json  |  36 ++++-
 .../com/datasift/DatasiftPushConfiguration.json |  17 +++
 .../datasift/DatasiftStreamConfiguration.json   |  17 +++
 .../com/datasift/DatasiftTwitterUser.json       |   9 +-
 .../DatasiftActivitySerializerTest.java         |  31 ++++-
 .../local/tasks/StreamsProviderTask.java        |   3 +-
 16 files changed, 357 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[07/14] git commit: adding push provider, reverting change to streams provider

Posted by sb...@apache.org.
adding push provider, reverting change to streams provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f272ff53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f272ff53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f272ff53

Branch: refs/heads/master
Commit: f272ff5347f0a00b24402960b251c143e966f34f
Parents: eda5a3c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Jul 27 19:13:25 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 154 +++++++++++++++++++
 .../com/datasift/DatasiftPushConfiguration.json |  17 ++
 .../datasift/DatasiftStreamConfiguration.json   |  17 ++
 3 files changed, 188 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
new file mode 100644
index 0000000..196f504
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -0,0 +1,154 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.stream.DeletedInteraction;
+import com.datasift.client.stream.Interaction;
+import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Requires Java Version 1.7!
+ * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
+ * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ */
+public class DatasiftPushProvider implements StreamsProvider {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
+
+    private DatasiftConfiguration config;
+    protected ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
+    private Map<String, DataSiftClient> clients;
+    private StreamEventListener eventListener;
+    private ObjectMapper mapper;
+
+    public DatasiftPushProvider() {
+
+    }
+
+    // to set up a webhook we need to be able to return a reference to this queue
+    public Queue<Interaction> getInteractions() {
+        return interactions;
+    }
+
+    @Override
+    public void startStream() {
+
+        Preconditions.checkNotNull(this.config);
+        Preconditions.checkNotNull(this.config.getApiKey());
+        Preconditions.checkNotNull(this.config.getUserName());
+
+    }
+
+    /**
+     * Shuts down all open streams from datasift.
+     */
+    public void stop() {
+    }
+
+    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
+    @Override
+    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+        StreamsDatum datum = null;
+        Interaction interaction;
+        while (!this.interactions.isEmpty()) {
+            interaction = this.interactions.poll();
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(interaction.getData()), interaction.getData().get("interaction").get("id").textValue());
+            } catch (JsonProcessingException jpe) {
+                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
+            }
+            if (datum != null) {
+                while (!datums.offer(datum)) {
+                    Thread.yield();
+                }
+            }
+
+        }
+        return new StreamsResultSet(datums);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.clients != null && this.clients.size() > 0;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.interactions = new ConcurrentLinkedQueue<Interaction>();
+        this.clients = Maps.newHashMap();
+        this.mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    public DatasiftConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(DatasiftConfiguration config) {
+        this.config = config;
+    }
+
+
+    /**
+     * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
+     * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
+     */
+    public static class DeleteHandler extends StreamEventListener {
+
+        public void onDelete(DeletedInteraction di) {
+            //go off and delete the interaction if you have it stored. This is a strict requirement!
+            LOGGER.info("DELETED:\n " + di);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
new file mode 100644
index 0000000..bb65ef0
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftPushConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftPushConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f272ff53/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
new file mode 100644
index 0000000..91a9974
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftStreamConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.datasift.DatasiftStreamConfiguration",
+    "extends": {"$ref":"DatasiftConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streamHash": {
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file


[11/14] git commit: added null check to get rid of common exception seen while stream initializing

Posted by sb...@apache.org.
added null check to get rid of common exception seen while stream initializing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e7141a4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e7141a4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e7141a4d

Branch: refs/heads/master
Commit: e7141a4d1a7fcaa8c99827f02257ee2987e6d826
Parents: 3af77bd
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:25:17 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/local/tasks/StreamsProviderTask.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e7141a4d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index f5189eb..fe7ea95 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -168,7 +168,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                     break;
                 default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
             }
-            flushResults(resultSet);
+            if( resultSet != null )
+                flushResults(resultSet);
 
         } catch( Exception e ) {
             LOGGER.error("Error in processing provider stream", e);


[06/14] git commit: added null check to get rid of common exception seen while stream initializing

Posted by sb...@apache.org.
added null check to get rid of common exception seen while stream initializing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/01a1a73b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/01a1a73b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/01a1a73b

Branch: refs/heads/master
Commit: 01a1a73be4926c9d518c5d1afb10f7b54db489f5
Parents: b730132
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:25:17 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:25:17 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/local/tasks/StreamsProviderTask.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/01a1a73b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index f5189eb..fe7ea95 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -168,7 +168,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                     break;
                 default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
             }
-            flushResults(resultSet);
+            if( resultSet != null )
+                flushResults(resultSet);
 
         } catch( Exception e ) {
             LOGGER.error("Error in processing provider stream", e);


[04/14] git commit: added missing fields switched a few date fields to Joda

Posted by sb...@apache.org.
added missing fields
switched a few date fields to Joda


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bdf30f78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bdf30f78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bdf30f78

Branch: refs/heads/master
Commit: bdf30f78a31e4a4a8a11e979488b62f3d914c9a1
Parents: 9e774a8
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:57 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:22:57 2014 -0500

----------------------------------------------------------------------
 .../main/jsonschema/com/datasift/Datasift.json  | 36 ++++++++++++++++++--
 .../com/datasift/DatasiftTwitterUser.json       |  9 ++++-
 2 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bdf30f78/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index 8d4ef87..37a63a0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -206,6 +206,25 @@
                 "link": {
                     "type": "string"
                 },
+                "mention_ids": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "integer"
+                        }
+                    ]
+                },
+                "mentions": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "string"
+                        }
+                    ]
+                },
+                "source": {
+                    "type": "string"
+                },
                 "schema": {
                     "dynamic": "true",
                     "properties": {
@@ -253,6 +272,9 @@
                 },
                 "tag": {
                     "type": "string"
+                },
+                "tag_extended": {
+                    "type": "string"
                 }
             }
         },
@@ -264,10 +286,10 @@
                 "code": {
                     "type": "array",
                     "items": [
-                            {
+                        {
                             "type": "integer"
-                            }
-                        ]
+                        }
+                    ]
                 },
                 "created_at": {
                     "type": "array",
@@ -576,6 +598,14 @@
                 "created_at": {
                     "type": "string"
                 },
+                "display_urls": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "string"
+                        }
+                    ]
+                },
                 "domains": {
                     "type": "array",
                     "items": [

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bdf30f78/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
index 97d93fe..3be49ff 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
@@ -4,11 +4,15 @@
     "javaType": "org.apache.streams.datasift.twitter.DatasiftTwitterUser",
     "properties": {
         "created_at": {
-            "type": "string"
+            "type": "string",
+            "format": "date-time"
         },
         "description": {
             "type": "string"
         },
+        "favourites_count": {
+            "type": "integer"
+        },
         "followers_count": {
             "type": "integer"
         },
@@ -39,6 +43,9 @@
         "profile_image_url": {
             "type": "string"
         },
+        "profile_image_url_https": {
+            "type": "string"
+        },
         "screen_name": {
             "type": "string"
         },


[05/14] git commit: created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string changed all classes in package except configurator to use this Mapper

Posted by sb...@apache.org.
created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string
changed all classes in package except configurator to use this Mapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7301326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7301326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7301326

Branch: refs/heads/master
Commit: b7301326a83936a528219c5b84b35a86a57d5971
Parents: bdf30f7
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:24:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:24:14 2014 -0500

----------------------------------------------------------------------
 .../provider/DatasiftStreamProvider.java        |  4 +-
 .../DatasiftTypeConverterProcessor.java         |  7 +-
 .../serializer/DatasiftActivitySerializer.java  |  4 +-
 .../DatasiftDefaultActivitySerializer.java      | 21 ++---
 .../DatasiftTweetActivitySerializer.java        | 14 ++--
 .../datasift/util/StreamsDatasiftMapper.java    | 84 ++++++++++++++++++++
 .../DatasiftActivitySerializerTest.java         | 31 +++++++-
 7 files changed, 135 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 09c01b0..8ed1443 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -35,7 +35,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -202,7 +202,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
     public void prepare(Object configurationObject) {
         this.interactions = new ConcurrentLinkedQueue<Interaction>();
         this.clients = Maps.newHashMap();
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper = StreamsDatasiftMapper.getInstance();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
index 203e5e8..0b847a4 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -21,10 +21,11 @@ package org.apache.streams.datasift.provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.*;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper = StreamsDatasiftMapper.getInstance();
         this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
         if(this.outClass.equals(Activity.class)) {
             this.converter = new ActivityConverter();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 1d5fd2c..7d644f0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -21,8 +21,8 @@ package org.apache.streams.datasift.serializer;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.List;
@@ -34,7 +34,7 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
 
     private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
     private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Override
     public String serializationFormat() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index 4095df6..b70aa12 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -1,15 +1,13 @@
 package org.apache.streams.datasift.serializer;
 
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.*;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.interaction.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +28,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
 
     public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
 
     @Override
     public String serializationFormat() {
@@ -46,7 +44,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         try {
             return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
         } catch (Exception e) {
-            LOGGER.error("Excpetion while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
             LOGGER.error("Exception : {}", e);
             throw new RuntimeException(e);
         }
@@ -55,13 +53,6 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
     @Override
     public Activity deserialize(Datasift serialized) {
 
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
         try {
 
             Activity activity = convert(serialized);
@@ -141,7 +132,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         Actor actor = new Actor();
         org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
         if(author == null) {
-            LOGGER.warn("Interactiond does not contain author information.");
+            LOGGER.warn("Interaction does not contain author information.");
             return actor;
         }
         String userName = author.getUsername();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index e7b0a52..b16aae2 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -22,8 +22,8 @@ package org.apache.streams.datasift.serializer;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Author;
 import org.apache.streams.datasift.interaction.Interaction;
 import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
 import org.apache.streams.datasift.twitter.Retweet;
@@ -134,7 +134,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
                 .orNull()));
         actor.setSummary(user.getDescription());
         try {
-            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
+            actor.setPublished(user.getCreatedAt());
         } catch (Exception e) {
             LOGGER.warn("Exception trying to parse date : {}", e);
         }
@@ -149,14 +149,16 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         extensions.put("followers", user.getFollowersCount());
         extensions.put("screenName", user.getScreenName());
         if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
+            extensions.put("favorites", user.getFavouritesCount());
         }
 
         Image profileImage = new Image();
         String profileUrl = null;
-        profileUrl = event.getInteraction().getAuthor().getAvatar();
-        if(profileUrl == null && user.getAdditionalProperties() != null) {
-            Object url = user.getAdditionalProperties().get("profile_image_url_https");
+        Author author = event.getInteraction().getAuthor();
+        if( author != null )
+            profileUrl = author.getAvatar();
+        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
+            Object url = user.getProfileImageUrlHttps();
             if(url instanceof String)
                 profileUrl = (String) url;
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
new file mode 100644
index 0000000..c5f2abf
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsDatasiftMapper extends StreamsJacksonMapper {
+
+    public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+
+    public static final Long getMillis(String dateTime) {
+
+        // this function is for pig which doesn't handle exceptions well
+        try {
+            Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+            return result;
+        } catch( Exception e ) {
+            return null;
+        }
+
+    }
+
+    private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper();
+
+    public static StreamsDatasiftMapper getInstance(){
+        return INSTANCE;
+    }
+
+    public StreamsDatasiftMapper() {
+        super();
+        registerModule(new SimpleModule()
+        {
+            {
+                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+                    @Override
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                        DateTime result = null;
+                        try {
+                            result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+                        } catch (Exception e) {}
+                        if (result == null) {
+                            try {
+                                result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+                            } catch (Exception e) {}
+                        }
+                        return result;
+                    }
+                });
+            }
+        });
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7301326/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 88dd2d6..90b7285 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,7 +2,8 @@ package org.apache.streams.datasift.serializer;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.junit.Test;
@@ -11,11 +12,12 @@ import java.util.Scanner;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 public class DatasiftActivitySerializerTest {
 
     private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Test
     public void testGeneralConversion() throws Exception {
@@ -39,6 +41,8 @@ public class DatasiftActivitySerializerTest {
         while(scanner.hasNextLine()) {
             line = scanner.nextLine();
             testGeneralConversion(line);
+            testDeserNoNull(line);
+            testDeserNoAddProps(line);
             System.out.println("ORIGINAL -> "+line);
             System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
             System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
@@ -59,8 +63,31 @@ public class DatasiftActivitySerializerTest {
         assertNotNull(json, activity.getUrl());
         Actor actor = activity.getActor();
         assertNotNull(json, actor);
+
     }
 
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    private void testDeserNoNull(String json) throws Exception {
+        Activity ser = SERIALIZER.deserialize(json);
+        String deser = MAPPER.writeValueAsString(ser);
+        int nulls = StringUtils.countMatches(deser, ":null");
+        assertEquals(0l, (long)nulls);
+
+    }
 
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    private void testDeserNoAddProps(String json) throws Exception {
+        Activity ser = SERIALIZER.deserialize(json);
+        String deser = MAPPER.writeValueAsString(ser);
+        int nulls = StringUtils.countMatches(deser, "additionalProperties:{");
+        assertEquals(0l, (long)nulls);
+
+    }
 
 }


[10/14] git commit: added missing fields switched a few date fields to Joda

Posted by sb...@apache.org.
added missing fields
switched a few date fields to Joda


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fec8a37c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fec8a37c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fec8a37c

Branch: refs/heads/master
Commit: fec8a37ca99df822616513727c618853a8a71fdc
Parents: 2e66e51
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:22:57 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 .../main/jsonschema/com/datasift/Datasift.json  | 36 ++++++++++++++++++--
 .../com/datasift/DatasiftTwitterUser.json       |  9 ++++-
 2 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fec8a37c/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index 8d4ef87..37a63a0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -206,6 +206,25 @@
                 "link": {
                     "type": "string"
                 },
+                "mention_ids": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "integer"
+                        }
+                    ]
+                },
+                "mentions": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "string"
+                        }
+                    ]
+                },
+                "source": {
+                    "type": "string"
+                },
                 "schema": {
                     "dynamic": "true",
                     "properties": {
@@ -253,6 +272,9 @@
                 },
                 "tag": {
                     "type": "string"
+                },
+                "tag_extended": {
+                    "type": "string"
                 }
             }
         },
@@ -264,10 +286,10 @@
                 "code": {
                     "type": "array",
                     "items": [
-                            {
+                        {
                             "type": "integer"
-                            }
-                        ]
+                        }
+                    ]
                 },
                 "created_at": {
                     "type": "array",
@@ -576,6 +598,14 @@
                 "created_at": {
                     "type": "string"
                 },
+                "display_urls": {
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "string"
+                        }
+                    ]
+                },
                 "domains": {
                     "type": "array",
                     "items": [

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fec8a37c/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
index 97d93fe..3be49ff 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
@@ -4,11 +4,15 @@
     "javaType": "org.apache.streams.datasift.twitter.DatasiftTwitterUser",
     "properties": {
         "created_at": {
-            "type": "string"
+            "type": "string",
+            "format": "date-time"
         },
         "description": {
             "type": "string"
         },
+        "favourites_count": {
+            "type": "integer"
+        },
         "followers_count": {
             "type": "integer"
         },
@@ -39,6 +43,9 @@
         "profile_image_url": {
             "type": "string"
         },
+        "profile_image_url_https": {
+            "type": "string"
+        },
         "screen_name": {
             "type": "string"
         },


[09/14] git commit: additions to enable streams-api

Posted by sb...@apache.org.
additions to enable streams-api


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/eda5a3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/eda5a3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/eda5a3cb

Branch: refs/heads/master
Commit: eda5a3cb77f512fa4e1094a0bf74a3c35c63605b
Parents: cbfe01a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 16:59:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:26:33 2014 -0500

----------------------------------------------------------------------
 pom.xml                                                   |  2 +-
 streams-contrib/streams-provider-datasift/pom.xml         |  2 +-
 .../datasift/provider/DatasiftStreamConfigurator.java     | 10 ++++------
 .../streams/datasift/provider/DatasiftStreamProvider.java |  8 ++++++--
 4 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/eda5a3cb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eebd5be..419ef89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <java.version>1.6</java.version>
+        <java.version>1.7</java.version>
         <org.osgi.service.http.port>8080</org.osgi.service.http.port>
         <org.osgi.service.http.port.secure>8443</org.osgi.service.http.port.secure>
         <jackson.version>2.3.2</jackson.version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/eda5a3cb/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 5c5f674..3b9f96f 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -83,7 +83,7 @@
 
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <testSourceDirectory>src/test/java17</testSourceDirectory>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/eda5a3cb/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
index d85d6e5..7025d39 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
@@ -19,7 +19,9 @@ under the License.
 package org.apache.streams.datasift.provider;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,12 +34,8 @@ public class DatasiftStreamConfigurator {
 
     public static DatasiftConfiguration detectConfiguration(Config datasift) {
 
-        DatasiftConfiguration datasiftConfiguration = new DatasiftConfiguration();
-
-        datasiftConfiguration.setApiKey(datasift.getString("apiKey"));
-        datasiftConfiguration.setUserName(datasift.getString("userName"));
-        datasiftConfiguration.setStreamHash(datasift.getStringList("hashes"));
-
+        DatasiftConfiguration datasiftConfiguration;
+        datasiftConfiguration = StreamsJacksonMapper.getInstance().convertValue(datasift.root().render(ConfigRenderOptions.concise()), DatasiftConfiguration.class);
         return datasiftConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/eda5a3cb/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 0dc4c92..09c01b0 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -55,7 +55,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
 
     private DatasiftConfiguration config;
-    private ConcurrentLinkedQueue<Interaction> interactions;
+    private ConcurrentLinkedQueue<Interaction> interactions = new ConcurrentLinkedQueue<Interaction>();
     private Map<String, DataSiftClient> clients;
     private StreamEventListener eventListener;
     private ObjectMapper mapper;
@@ -69,6 +69,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
         this(listener, null);
     }
 
+    // to set up a webhook we need to be able to return a reference to this queue
+    public Queue<Interaction> getInteractions() {
+        return interactions;
+    }
+
     /**
      * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles deletion notices received from twitter.
      * @param config   Configuration to use
@@ -184,7 +189,6 @@ public class DatasiftStreamProvider implements StreamsProvider {
         return null;
     }
 
-    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }


[13/14] git commit: Merge branch 'streams-api' of https://git-wip-us.apache.org/repos/asf/incubator-streams into streams-api

Posted by sb...@apache.org.
Merge branch 'streams-api' of https://git-wip-us.apache.org/repos/asf/incubator-streams into streams-api


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5dec4aad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5dec4aad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5dec4aad

Branch: refs/heads/master
Commit: 5dec4aadb5947614fb7beb8b631c017adaea0fec
Parents: e7141a4 01a1a73
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Jul 30 21:27:32 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Jul 30 21:27:32 2014 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------