You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/08 21:13:01 UTC

[10/38] incubator-streams git commit: a few package name changes working twitterurlapiprocess which uses streams-http (w/o authentication) working people pattern processors which use streams-http (w/ authentication)

a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0c8e67ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0c8e67ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0c8e67ce

Branch: refs/heads/STREAMS-49
Commit: 0c8e67ce26e131282aaaa03815ad6daefd684346
Parents: ad5f90c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 23:13:29 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 23:13:29 2014 -0500

----------------------------------------------------------------------
 .../http/processor/SimpleHTTPGetProcessor.java  | 50 +++++++++++++++-----
 .../peoplepattern/AccountTypeProcessor.java     |  7 ++-
 .../peoplepattern/DemographicsProcessor.java    |  7 ++-
 .../streams-provider-twitter/pom.xml            |  2 +-
 .../processor/TwitterUrlApiProcessor.java       | 21 ++++----
 .../provider/TwitterTimelineProvider.java       |  8 ++--
 .../apache/streams/data/util/ExtensionUtil.java | 37 ++++++---------
 7 files changed, 77 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 0d17cc6..d3d4429 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -21,6 +21,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,15 +100,30 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
         }
 
     }
-        /**
-         Override this to place result in non-standard location on document
-         */
-    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+
+        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+            return mapper.convertValue(rootDocument, ActivityObject.class);
+        else
+            return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class);
+
+    }
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) {
 
         if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return rootDocument;
+            return mapper.convertValue(activityObject, ObjectNode.class);
         else
-            return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+            rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class));
+
+        return rootDocument;
 
     }
 
@@ -150,9 +166,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
             try {
                 response.close();
             } catch (IOException e) {}
-            try {
-                httpclient.close();
-            } catch (IOException e) {}
         }
 
         if( entityString == null )
@@ -162,12 +175,12 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
 
         ObjectNode extensionFragment = prepareExtensionFragment(entityString);
 
-        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
-        ExtensionUtil.ensureExtensions(extensionEntity);
+        ActivityObject extensionEntity = getEntityToExtend(rootDocument);
 
         ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
 
+        rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+
         entry.setDocument(rootDocument);
 
         result.add(entry);
@@ -220,5 +233,18 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
     @Override
     public void cleanUp() {
         LOGGER.info("shutting down SimpleHTTPGetProcessor");
+        try {
+            httpclient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                httpclient = null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index d180b7f..edcf4d3 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
     @Override
     protected Map<String, String> prepareParams(StreamsDatum entry) {
         Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
         Actor actor = activity.getActor();
-        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
-        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+        String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
         Map<String, String> params = Maps.newHashMap();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 6ffbb9b..60db379 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor {
     @Override
     protected Map<String, String> prepareParams(StreamsDatum entry) {
         Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
         Actor actor = activity.getActor();
-        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
-        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+        String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
         Map<String, String> params = Maps.newHashMap();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index f0d65f8..604e5a7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -55,7 +55,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-processor-http</artifactId>
+            <artifactId>streams-http</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 438937f..54e1369 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,11 +1,10 @@
 package org.apache.streams.twitter.processor;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.components.http.SimpleHTTPGetProcessor;
-import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
@@ -21,23 +20,27 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
     public TwitterUrlApiProcessor() {
         super();
         this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setResourcePath("/1/urls/count.json");
+        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
         this.configuration.setExtension("twitter_url_count");
     }
 
     public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
         super(processorConfiguration);
         this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setResourcePath("/1/urls/count.json");
+        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
         this.configuration.setExtension("twitter_url_count");
     }
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
         Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-        Activity activity = mapper.convertValue(entry, Activity.class);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
-        return super.process(entry);
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        if( activity.getLinks() != null && activity.getLinks().size() > 0)
+            return super.process(entry);
+        else
+            return Lists.newArrayList(entry);
     }
 
     @Override
@@ -45,7 +48,7 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
 
         Map<String, String> params = Maps.newHashMap();
 
-        params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+        params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
 
         return params;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index ae755c2..86395a2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,9 +19,9 @@
 package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -31,14 +31,16 @@ import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index a8d068a..7ce013c 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,7 +1,6 @@
 package org.apache.streams.data.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.ActivityObject;
@@ -41,38 +40,33 @@ public class ExtensionUtil {
 
     private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public static Map<String, Object> getExtensions(ObjectNode object) {
+    public static Map<String, Object> getExtensions(ActivityObject object) {
         ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        Map<String,Object> extensions = ensureExtensions(object);
         return extensions;
     }
 
-    public static Object getExtension(ObjectNode object, String key) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static Object getExtension(ActivityObject object, String key) {
+        Map<String,Object> extensions = ensureExtensions(object);
         return extensions.get(key);
     }
 
-    public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+    public static void setExtensions(ActivityObject object, Map<String, Object> extensions) {
+        object.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
     };
 
-    public static void addExtension(ObjectNode object, String key, Object extension) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static void addExtension(ActivityObject object, String key, Object extension) {
+        Map<String,Object> extensions = ensureExtensions(object);
         extensions.put(key, extension);
     };
 
-    public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+    public static void addExtensions(ActivityObject object, Map<String, Object> extensions) {
         for( Map.Entry<String, Object> item : extensions.entrySet())
-            activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+            addExtension(object, item.getKey(), item.getValue());
     };
 
-    public static void removeExtension(ObjectNode object, String key) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static void removeExtension(ActivityObject object, String key) {
+        Map<String,Object> extensions = ensureExtensions(object);
         extensions.remove(key);
     };
 
@@ -82,13 +76,12 @@ public class ExtensionUtil {
      * @return the Map representing the extensions property
      */
     @SuppressWarnings("unchecked")
-    public static Map<String, Object> ensureExtensions(ObjectNode object) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static Map<String, Object> ensureExtensions(ActivityObject object) {
+        Map<String,Object> extensions = (Map<String,Object>) object.getAdditionalProperties().get(EXTENSION_PROPERTY);
         if(extensions == null) {
             extensions = Maps.newHashMap();
             setExtensions(object, extensions);
         }
-        return getExtensions(object);
+        return extensions;
     }
 }