You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/11 20:54:15 UTC
[08/32] incubator-streams git commit: a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)
a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0c8e67ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0c8e67ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0c8e67ce
Branch: refs/heads/STREAMS-212
Commit: 0c8e67ce26e131282aaaa03815ad6daefd684346
Parents: ad5f90c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 23:13:29 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 23:13:29 2014 -0500
----------------------------------------------------------------------
.../http/processor/SimpleHTTPGetProcessor.java | 50 +++++++++++++++-----
.../peoplepattern/AccountTypeProcessor.java | 7 ++-
.../peoplepattern/DemographicsProcessor.java | 7 ++-
.../streams-provider-twitter/pom.xml | 2 +-
.../processor/TwitterUrlApiProcessor.java | 21 ++++----
.../provider/TwitterTimelineProvider.java | 8 ++--
.../apache/streams/data/util/ExtensionUtil.java | 37 ++++++---------
7 files changed, 77 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 0d17cc6..d3d4429 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -21,6 +21,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,15 +100,30 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
}
}
- /**
- Override this to place result in non-standard location on document
- */
- protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+
+ if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+ return mapper.convertValue(rootDocument, ActivityObject.class);
+ else
+ return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class);
+
+ }
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) {
if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
- return rootDocument;
+ return mapper.convertValue(activityObject, ObjectNode.class);
else
- return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+ rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class));
+
+ return rootDocument;
}
@@ -150,9 +166,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
try {
response.close();
} catch (IOException e) {}
- try {
- httpclient.close();
- } catch (IOException e) {}
}
if( entityString == null )
@@ -162,12 +175,12 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
ObjectNode extensionFragment = prepareExtensionFragment(entityString);
- ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
- ExtensionUtil.ensureExtensions(extensionEntity);
+ ActivityObject extensionEntity = getEntityToExtend(rootDocument);
ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+ rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+
entry.setDocument(rootDocument);
result.add(entry);
@@ -220,5 +233,18 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
@Override
public void cleanUp() {
LOGGER.info("shutting down SimpleHTTPGetProcessor");
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ httpclient = null;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index d180b7f..edcf4d3 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -18,7 +18,6 @@
package org.apache.streams.peoplepattern;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpConfigurator;
import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
@Override
protected Map<String, String> prepareParams(StreamsDatum entry) {
Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
- //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
Actor actor = activity.getActor();
- ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
- String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+ String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
Map<String, String> params = Maps.newHashMap();
params.put("id", actor.getId());
params.put("name", actor.getDisplayName());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 6ffbb9b..60db379 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -18,7 +18,6 @@
package org.apache.streams.peoplepattern;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpConfigurator;
import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor {
@Override
protected Map<String, String> prepareParams(StreamsDatum entry) {
Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
- //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
Actor actor = activity.getActor();
- ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
- String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+ String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
Map<String, String> params = Maps.newHashMap();
params.put("id", actor.getId());
params.put("name", actor.getDisplayName());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index f0d65f8..604e5a7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -55,7 +55,7 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-processor-http</artifactId>
+ <artifactId>streams-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 438937f..54e1369 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,11 +1,10 @@
package org.apache.streams.twitter.processor;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.components.http.SimpleHTTPGetProcessor;
-import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
@@ -21,23 +20,27 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
public TwitterUrlApiProcessor() {
super();
this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
this.configuration.setExtension("twitter_url_count");
}
public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
super(processorConfiguration);
this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
this.configuration.setExtension("twitter_url_count");
}
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- Activity activity = mapper.convertValue(entry, Activity.class);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
- return super.process(entry);
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ if( activity.getLinks() != null && activity.getLinks().size() > 0)
+ return super.process(entry);
+ else
+ return Lists.newArrayList(entry);
}
@Override
@@ -45,7 +48,7 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
Map<String, String> params = Maps.newHashMap();
- params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+ params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
return params;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index ae755c2..86395a2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,9 +19,9 @@
package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -31,14 +31,16 @@ import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index a8d068a..7ce013c 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,7 +1,6 @@
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.ActivityObject;
@@ -41,38 +40,33 @@ public class ExtensionUtil {
private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- public static Map<String, Object> getExtensions(ObjectNode object) {
+ public static Map<String, Object> getExtensions(ActivityObject object) {
ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ Map<String,Object> extensions = ensureExtensions(object);
return extensions;
}
- public static Object getExtension(ObjectNode object, String key) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static Object getExtension(ActivityObject object, String key) {
+ Map<String,Object> extensions = ensureExtensions(object);
return extensions.get(key);
}
- public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+ public static void setExtensions(ActivityObject object, Map<String, Object> extensions) {
+ object.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
};
- public static void addExtension(ObjectNode object, String key, Object extension) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static void addExtension(ActivityObject object, String key, Object extension) {
+ Map<String,Object> extensions = ensureExtensions(object);
extensions.put(key, extension);
};
- public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ public static void addExtensions(ActivityObject object, Map<String, Object> extensions) {
for( Map.Entry<String, Object> item : extensions.entrySet())
- activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+ addExtension(object, item.getKey(), item.getValue());
};
- public static void removeExtension(ObjectNode object, String key) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static void removeExtension(ActivityObject object, String key) {
+ Map<String,Object> extensions = ensureExtensions(object);
extensions.remove(key);
};
@@ -82,13 +76,12 @@ public class ExtensionUtil {
* @return the Map representing the extensions property
*/
@SuppressWarnings("unchecked")
- public static Map<String, Object> ensureExtensions(ObjectNode object) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static Map<String, Object> ensureExtensions(ActivityObject object) {
+ Map<String,Object> extensions = (Map<String,Object>) object.getAdditionalProperties().get(EXTENSION_PROPERTY);
if(extensions == null) {
extensions = Maps.newHashMap();
setExtensions(object, extensions);
}
- return getExtensions(object);
+ return extensions;
}
}