You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/04 22:48:31 UTC

[1/2] git commit: Fixed NPE in DatasiftTypeConverterProcessor. Changed DatasiftStreamProvider to output interactions as Strings, since Interactions are not serializable.

Repository: incubator-streams
Updated Branches:
  refs/heads/master 8d554374a -> 55417ffe0


Fixed NPE in DatasiftTypeConverterProcessor.  Changed DatasiftStreamProvider to output interactions as Strings, since Interactions are not serializable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0232a3a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0232a3a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0232a3a0

Branch: refs/heads/master
Commit: 0232a3a0c943c08ecdbe257fa9ebcab959d1dc87
Parents: f4749b9
Author: rebanks <re...@w2odigital.com>
Authored: Wed May 28 14:16:16 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Wed May 28 14:16:16 2014 -0500

----------------------------------------------------------------------
 .../datasift/provider/DatasiftConverter.java    |  37 ++++++
 .../provider/DatasiftEventProcessor.java        | 102 ----------------
 .../provider/DatasiftStreamProvider.java        |  39 ++++--
 .../provider/DatasiftTypeConverter.java         |  37 ------
 .../DatasiftTypeConverterProcessor.java         | 121 +++++++++++++++++++
 .../DatasiftTypeConverterProcessorTest.java     |  55 +++++++++
 6 files changed, 241 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
new file mode 100644
index 0000000..fa614b0
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Converts a {@link org.apache.streams.datasift.Datasift} object to a StreamsDatum
+ */
+public interface DatasiftConverter {
+
+    /**
+     * Converts a datasift related object to the desired resulting object.
+     * @param toConvert
+     * @param mapper
+     * @return
+     */
+    public Object convert(Object toConvert, ObjectMapper mapper);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
deleted file mode 100644
index 09d618c..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class DatasiftEventProcessor implements StreamsProcessor {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftEventProcessor.class);
-
-    private ObjectMapper mapper;
-    private Class outClass;
-    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
-    private DatasiftTypeConverter converter;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public DatasiftEventProcessor(Class outClass) {
-        this.outClass = outClass;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object item = entry.getDocument();
-        try {
-            Datasift datasift = mapper.convertValue(item, Datasift.class);
-            result.add(this.converter.convert(datasift));
-        } catch (Exception e) {
-            LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mapper = StreamsJacksonMapper.getInstance();
-        this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
-        if(this.outClass.equals(Activity.class)) {
-            this.converter = new ActivityConverter();
-        } else if (this.outClass.equals(String.class)) {
-            this.converter = new StringConverter();
-        } else {
-            throw new NotImplementedException("No converter implemented for class : "+this.outClass.getName());
-        }
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    private class ActivityConverter implements DatasiftTypeConverter {
-        @Override
-        public StreamsDatum convert(Datasift datasift) {
-            return new StreamsDatum(datasiftInteractionActivitySerializer.deserialize(datasift), datasift.getInteraction().getId());
-        }
-    }
-
-    private class StringConverter implements DatasiftTypeConverter {
-        @Override
-        public StreamsDatum convert(Datasift datasift) {
-            try {
-                return new StreamsDatum(mapper.writeValueAsString(datasift), datasift.getInteraction().getId());
-            } catch (JsonProcessingException jpe) {
-                throw new RuntimeException(jpe);
-            }
-        }
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 25229ef..96466ea 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -24,6 +24,8 @@ import com.datasift.client.core.Stream;
 import com.datasift.client.stream.DeletedInteraction;
 import com.datasift.client.stream.Interaction;
 import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
@@ -33,6 +35,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,9 +58,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
     private ConcurrentLinkedQueue<Interaction> interactions;
     private Map<String, DataSiftClient> clients;
     private StreamEventListener eventListener;
+    private ObjectMapper mapper;
 
     /**
      * Constructor that searches for available configurations
+     *
      * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles deletion notices received from twitter.
      */
     public DatasiftStreamProvider(StreamEventListener listener) {
@@ -65,12 +70,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
     }
 
     /**
-     *
      * @param listener {@link com.datasift.client.stream.StreamEventListener} that handles deletion notices received from twitter.
-     * @param config  Configuration to use
+     * @param config   Configuration to use
      */
     public DatasiftStreamProvider(StreamEventListener listener, DatasiftConfiguration config) {
-        if(config == null) {
+        if (config == null) {
             Config datasiftConfig = StreamsConfigurator.config.getConfig("datasift");
             this.config = DatasiftStreamConfigurator.detectConfiguration(datasiftConfig);
         } else {
@@ -89,7 +93,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
         Preconditions.checkNotNull(this.config.getUserName());
         Preconditions.checkNotNull(this.clients);
 
-        for( String hash : this.config.getStreamHash()) {
+        for (String hash : this.config.getStreamHash()) {
             startStreamForHash(hash);
         }
 
@@ -97,6 +101,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
 
     /**
      * Creates a connection to datasift and starts collection of data from the resulting string.
+     *
      * @param streamHash
      */
     public void startStreamForHash(String streamHash) {
@@ -113,6 +118,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
 
     /**
      * Exposed for testing purposes.
+     *
      * @param userName
      * @param apiKey
      * @return
@@ -124,11 +130,12 @@ public class DatasiftStreamProvider implements StreamsProvider {
 
     /**
      * If a stream has been opened for the supplied stream hash, that stream will be shutdown.
+     *
      * @param streamHash
      */
     public void shutDownStream(String streamHash) {
         synchronized (clients) {
-            if(!this.clients.containsKey(streamHash))
+            if (!this.clients.containsKey(streamHash))
                 return;
             DataSiftClient client = this.clients.get(streamHash);
             LOGGER.debug("Shutting down stream for hash: {}", streamHash);
@@ -142,22 +149,31 @@ public class DatasiftStreamProvider implements StreamsProvider {
      */
     public void stop() {
         synchronized (clients) {
-            for(DataSiftClient client : this.clients.values()) {
+            for (DataSiftClient client : this.clients.values()) {
                 client.shutdown();
             }
         }
     }
 
     // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
-    @Override //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
+    @Override
+    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
         Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
-
-            while(!this.interactions.isEmpty()) {
-                Interaction interaction = this.interactions.poll();
-                while(!datums.offer(new StreamsDatum(interaction, interaction.getData().get("interaction").get("id").textValue()))) {
+        StreamsDatum datum = null;
+        Interaction interaction;
+        while (!this.interactions.isEmpty()) {
+            interaction = this.interactions.poll();
+            try {
+                datum = new StreamsDatum(this.mapper.writeValueAsString(this.interactions.poll()), interaction.getData().get("interaction").get("id").textValue());
+            } catch (JsonProcessingException jpe) {
+                LOGGER.error("Exception while converting Interaction to String : {}", jpe);
+            }
+            if (datum != null) {
+                while (!datums.offer(datum)) {
                     Thread.yield();
                 }
+            }
 
         }
         return new StreamsResultSet(datums);
@@ -177,6 +193,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
     public void prepare(Object configurationObject) {
         this.interactions = new ConcurrentLinkedQueue<Interaction>();
         this.clients = Maps.newHashMap();
+        this.mapper = StreamsJacksonMapper.getInstance();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverter.java
deleted file mode 100644
index 9fc90e6..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.datasift.Datasift;
-
-/**
- * Converts a {@link org.apache.streams.datasift.Datasift} object to a StreamsDatum
- */
-public interface DatasiftTypeConverter {
-
-    /**
-     * Converts a {@link org.apache.streams.datasift.Datasift} object to a StreamsDatum
-     * @param datasift
-     * @return
-     */
-    public StreamsDatum convert(Datasift datasift);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
new file mode 100644
index 0000000..1e994fb
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -0,0 +1,121 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class DatasiftTypeConverterProcessor implements StreamsProcessor {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
+
+    private ObjectMapper mapper;
+    private Class outClass;
+    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
+    private DatasiftConverter converter;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public DatasiftTypeConverterProcessor(Class outClass) {
+        this.outClass = outClass;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newLinkedList();
+        Object doc;
+        try {
+            doc = this.converter.convert(entry.getDocument(), this.mapper);
+            if(doc != null) {
+                result.add(new StreamsDatum(doc, entry.getId()));
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.mapper = StreamsJacksonMapper.getInstance();
+        this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+        if(this.outClass.equals(Activity.class)) {
+            this.converter = new ActivityConverter();
+        } else if (this.outClass.equals(String.class)) {
+            this.converter = new StringConverter();
+        } else {
+            throw new NotImplementedException("No converter implemented for class : "+this.outClass.getName());
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    private class ActivityConverter implements DatasiftConverter {
+
+        @Override
+        public Object convert(Object toConvert, ObjectMapper mapper) {
+            if(toConvert instanceof Activity)
+                return toConvert;
+            try {
+                if(toConvert instanceof String)
+                    return mapper.readValue((String)toConvert, Activity.class);
+                return mapper.convertValue(toConvert, Activity.class);
+            } catch (Exception e) {
+                LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
+                LOGGER.error("Exception : {}", e);
+                return null;
+            }
+        }
+
+
+    }
+
+    private class StringConverter implements DatasiftConverter {
+        @Override
+        public Object convert(Object toConvert, ObjectMapper mapper) {
+            if(toConvert instanceof String)
+                return toConvert;
+            try {
+                return mapper.writeValueAsString(toConvert);
+            } catch (Exception e) {
+                LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
+                LOGGER.error("Exception : {}", e);
+                return null;
+            }
+        }
+
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0232a3a0/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
new file mode 100644
index 0000000..4552a9b
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
@@ -0,0 +1,55 @@
+package org.apache.streams.datasift.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Test;
+
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+/**
+ *
+ */
+public class DatasiftTypeConverterProcessorTest {
+
+    private static final String DATASIFT_JSON = "{\"_index\":\"major_music_artist_datasift_20140527\",\"_type\":\"datasift\",\"_id\":\"1e3e5ef97532a580e0741841f5746728\",\"_version\":1,\"found\":true, \"_source\" : {\"data\":{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharr
 ell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\"confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"O
 fficial Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.you
 tube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"title\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no será la fiesta que todos esperamos, pero mientras estemos aquí debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"España..Olé!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg
 .com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":70931384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq…\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificación en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_
 count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}}}";
+
+    @Test
+    public void testTypeConverterToString() {
+        final String ID = "1";
+        StreamsProcessor processor = new DatasiftTypeConverterProcessor(String.class);
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterToActivity() {
+        final String ID = "1";
+        StreamsProcessor processor = new DatasiftTypeConverterProcessor(Activity.class);
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+
+
+}


[2/2] git commit: Merge PR#33 from 'rbnks/STREAMS-102'

Posted by mf...@apache.org.
Merge PR#33 from 'rbnks/STREAMS-102'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/55417ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/55417ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/55417ffe

Branch: refs/heads/master
Commit: 55417ffe061676264f164f9a30d007f58289cf8f
Parents: 8d55437 0232a3a
Author: mfranklin <mf...@apache.org>
Authored: Wed Jun 4 16:47:50 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jun 4 16:47:50 2014 -0400

----------------------------------------------------------------------
 .../datasift/provider/DatasiftConverter.java    |  37 ++++++
 .../provider/DatasiftEventProcessor.java        | 102 ----------------
 .../provider/DatasiftStreamProvider.java        |  39 ++++--
 .../provider/DatasiftTypeConverter.java         |  37 ------
 .../DatasiftTypeConverterProcessor.java         | 121 +++++++++++++++++++
 .../DatasiftTypeConverterProcessorTest.java     |  55 +++++++++
 6 files changed, 241 insertions(+), 150 deletions(-)
----------------------------------------------------------------------