You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 20:11:32 UTC
[01/12] incubator-streams git commit: just pons
Repository: incubator-streams
Updated Branches:
refs/heads/master b8e7a69d0 -> ab9c6963d
just pons
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1464819f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1464819f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1464819f
Branch: refs/heads/master
Commit: 1464819fd7ea147830a4211ea2ac6af4aa715022
Parents: 35a8fbf
Author: sblackmon <sb...@apache.org>
Authored: Sun Sep 14 10:24:47 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Sun Sep 14 10:24:47 2014 -0500
----------------------------------------------------------------------
streams-components/pom.xml | 62 ++++++++++
.../streams-processor-http/pom.xml | 114 +++++++++++++++++++
2 files changed, 176 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
new file mode 100644
index 0000000..26384b1
--- /dev/null
+++ b/streams-components/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-project</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-components</artifactId>
+
+ <packaging>pom</packaging>
+ <name>streams-components</name>
+
+ <properties>
+
+ </properties>
+
+ <modules>
+ <module>streams-processor-http</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
new file mode 100644
index 0000000..67538aa
--- /dev/null
+++ b/streams-components/streams-processor-http/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-project</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>streams-processor-http</artifactId>
+
+ <name>streams-processor-http</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.http</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
[08/12] incubator-streams git commit: a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)
Posted by sb...@apache.org.
a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0c8e67ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0c8e67ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0c8e67ce
Branch: refs/heads/master
Commit: 0c8e67ce26e131282aaaa03815ad6daefd684346
Parents: ad5f90c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 23:13:29 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 23:13:29 2014 -0500
----------------------------------------------------------------------
.../http/processor/SimpleHTTPGetProcessor.java | 50 +++++++++++++++-----
.../peoplepattern/AccountTypeProcessor.java | 7 ++-
.../peoplepattern/DemographicsProcessor.java | 7 ++-
.../streams-provider-twitter/pom.xml | 2 +-
.../processor/TwitterUrlApiProcessor.java | 21 ++++----
.../provider/TwitterTimelineProvider.java | 8 ++--
.../apache/streams/data/util/ExtensionUtil.java | 37 ++++++---------
7 files changed, 77 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 0d17cc6..d3d4429 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -21,6 +21,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,15 +100,30 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
}
}
- /**
- Override this to place result in non-standard location on document
- */
- protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+
+ if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+ return mapper.convertValue(rootDocument, ActivityObject.class);
+ else
+ return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class);
+
+ }
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) {
if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
- return rootDocument;
+ return mapper.convertValue(activityObject, ObjectNode.class);
else
- return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+ rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class));
+
+ return rootDocument;
}
@@ -150,9 +166,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
try {
response.close();
} catch (IOException e) {}
- try {
- httpclient.close();
- } catch (IOException e) {}
}
if( entityString == null )
@@ -162,12 +175,12 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
ObjectNode extensionFragment = prepareExtensionFragment(entityString);
- ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
- ExtensionUtil.ensureExtensions(extensionEntity);
+ ActivityObject extensionEntity = getEntityToExtend(rootDocument);
ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+ rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+
entry.setDocument(rootDocument);
result.add(entry);
@@ -220,5 +233,18 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
@Override
public void cleanUp() {
LOGGER.info("shutting down SimpleHTTPGetProcessor");
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ httpclient = null;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index d180b7f..edcf4d3 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -18,7 +18,6 @@
package org.apache.streams.peoplepattern;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpConfigurator;
import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
@Override
protected Map<String, String> prepareParams(StreamsDatum entry) {
Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
- //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
Actor actor = activity.getActor();
- ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
- String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+ String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
Map<String, String> params = Maps.newHashMap();
params.put("id", actor.getId());
params.put("name", actor.getDisplayName());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 6ffbb9b..60db379 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -18,7 +18,6 @@
package org.apache.streams.peoplepattern;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpConfigurator;
import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.data.util.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor {
@Override
protected Map<String, String> prepareParams(StreamsDatum entry) {
Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
- //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
Actor actor = activity.getActor();
- ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
- String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+ String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
Map<String, String> params = Maps.newHashMap();
params.put("id", actor.getId());
params.put("name", actor.getDisplayName());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index f0d65f8..604e5a7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -55,7 +55,7 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-processor-http</artifactId>
+ <artifactId>streams-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 438937f..54e1369 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,11 +1,10 @@
package org.apache.streams.twitter.processor;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.components.http.SimpleHTTPGetProcessor;
-import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
@@ -21,23 +20,27 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
public TwitterUrlApiProcessor() {
super();
this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
this.configuration.setExtension("twitter_url_count");
}
public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
super(processorConfiguration);
this.configuration.setHostname("urls.api.twitter.com");
- this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setResourcePath("/1/urls/count.json");
+ this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
this.configuration.setExtension("twitter_url_count");
}
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
Preconditions.checkArgument(entry.getDocument() instanceof Activity);
- Activity activity = mapper.convertValue(entry, Activity.class);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
- return super.process(entry);
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ if( activity.getLinks() != null && activity.getLinks().size() > 0)
+ return super.process(entry);
+ else
+ return Lists.newArrayList(entry);
}
@Override
@@ -45,7 +48,7 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
Map<String, String> params = Maps.newHashMap();
- params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+ params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
return params;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index ae755c2..86395a2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,9 +19,9 @@
package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -31,14 +31,16 @@ import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index a8d068a..7ce013c 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,7 +1,6 @@
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.ActivityObject;
@@ -41,38 +40,33 @@ public class ExtensionUtil {
private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- public static Map<String, Object> getExtensions(ObjectNode object) {
+ public static Map<String, Object> getExtensions(ActivityObject object) {
ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ Map<String,Object> extensions = ensureExtensions(object);
return extensions;
}
- public static Object getExtension(ObjectNode object, String key) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static Object getExtension(ActivityObject object, String key) {
+ Map<String,Object> extensions = ensureExtensions(object);
return extensions.get(key);
}
- public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+ public static void setExtensions(ActivityObject object, Map<String, Object> extensions) {
+ object.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
};
- public static void addExtension(ObjectNode object, String key, Object extension) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static void addExtension(ActivityObject object, String key, Object extension) {
+ Map<String,Object> extensions = ensureExtensions(object);
extensions.put(key, extension);
};
- public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ public static void addExtensions(ActivityObject object, Map<String, Object> extensions) {
for( Map.Entry<String, Object> item : extensions.entrySet())
- activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+ addExtension(object, item.getKey(), item.getValue());
};
- public static void removeExtension(ObjectNode object, String key) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static void removeExtension(ActivityObject object, String key) {
+ Map<String,Object> extensions = ensureExtensions(object);
extensions.remove(key);
};
@@ -82,13 +76,12 @@ public class ExtensionUtil {
* @return the Map representing the extensions property
*/
@SuppressWarnings("unchecked")
- public static Map<String, Object> ensureExtensions(ObjectNode object) {
- ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
- Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ public static Map<String, Object> ensureExtensions(ActivityObject object) {
+ Map<String,Object> extensions = (Map<String,Object>) object.getAdditionalProperties().get(EXTENSION_PROPERTY);
if(extensions == null) {
extensions = Maps.newHashMap();
setExtensions(object, extensions);
}
- return getExtensions(object);
+ return extensions;
}
}
[10/12] incubator-streams git commit: simple Integration Test
Posted by sb...@apache.org.
simple Integration Test
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbd7d890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbd7d890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbd7d890
Branch: refs/heads/master
Commit: dbd7d890804391025cf085f8a1c141a643d0b94b
Parents: b8e7a69
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 09:45:32 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:47:21 2014 -0800
----------------------------------------------------------------------
pom.xml | 56 ++++++++++++++++++++
streams-contrib/streams-persist-console/pom.xml | 34 ++++++++++++
.../streams/console/ConsolePersistReader.java | 11 ++--
.../streams/console/ConsolePersistWriter.java | 11 ++--
4 files changed, 103 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f8b6a14..e86f02d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,8 @@
<jaxb2-basics.version>0.8.4</jaxb2-basics.version>
<jaxbutil.version>1.2.6</jaxbutil.version>
<junit.version>4.11</junit.version>
+ <surefire.plugin.version>2.17</surefire.plugin.version>
+ <failsafe.plugin.version>2.17</failsafe.plugin.version>
<slf4j.version>1.7.6</slf4j.version>
<logback.version>1.1.1</logback.version>
<commons-io.version>2.4</commons-io.version>
@@ -90,6 +92,7 @@
<facebook4j.version>2.1.0</facebook4j.version>
<maven.enforcer.plugin.version>1.3.1</maven.enforcer.plugin.version>
<mockito.version>1.9.5</mockito.version>
+ <powermock.version>1.5.6</powermock.version>
</properties>
<modules>
@@ -177,6 +180,41 @@
<artifactId>build-helper-maven-plugin</artifactId>
<version>${build-helper.version}</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <!-- Sets the VM argument line used when integration tests are run. -->
+ <argLine>${failsafeArgLine}</argLine>
+ <!-- Skips integration tests if the value of skip.integration.tests property is true -->
+ <skipTests>${skip.integration.tests}</skipTests>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.plugin.version}</version>
+ <configuration>
+ <!-- Sets the VM argument line used when unit tests are run. -->
+ <argLine>${surefireArgLine}</argLine>
+ <!-- Skips unit tests if the value of skip.unit.tests property is true -->
+ <skipTests>${skip.unit.tests}</skipTests>
+ <!-- Excludes integration tests when unit tests are run. -->
+ <excludes>
+ <exclude>**/IT*.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
</build>
@@ -231,6 +269,24 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/pom.xml b/streams-contrib/streams-persist-console/pom.xml
index c7f2cd3..02ec403 100644
--- a/streams-contrib/streams-persist-console/pom.xml
+++ b/streams-contrib/streams-persist-console/pom.xml
@@ -26,5 +26,39 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ </dependency>
</dependencies>
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ </build>
+
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 776d5a3..8afba85 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.InputStream;
+import java.io.PrintStream;
import java.math.BigInteger;
import java.util.Queue;
import java.util.Scanner;
@@ -44,16 +45,16 @@ public class ConsolePersistReader implements StreamsPersistReader {
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ protected InputStream inputStream = System.in;
public ConsolePersistReader() {
this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
}
- public ConsolePersistReader(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
+ public ConsolePersistReader(InputStream inputStream) {
+ this();
+ this.inputStream = inputStream;
}
-
public void prepare(Object o) {
}
@@ -77,7 +78,7 @@ public class ConsolePersistReader implements StreamsPersistReader {
LOGGER.info("{} readCurrent", STREAMS_ID);
- Scanner sc = new Scanner(System.in);
+ Scanner sc = new Scanner(inputStream);
while( sc.hasNextLine() ) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 96d116f..53bb8d7 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -27,6 +27,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.PrintStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,6 +35,8 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
+ protected PrintStream printStream = System.out;
+
protected volatile Queue<StreamsDatum> persistQueue;
private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -42,8 +45,9 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
}
- public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
+ public ConsolePersistWriter(PrintStream printStream) {
+ this();
+ this.printStream = printStream;
}
public void prepare(Object o) {
@@ -61,8 +65,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
String text = mapper.writeValueAsString(entry);
- System.out.println("\n"+text+"\n");
-// LOGGER.info(text);
+ printStream.println(text);
} catch (JsonProcessingException e) {
LOGGER.warn("save: {}", e);
[07/12] incubator-streams git commit: removing validation,
as jackson does not contain a default implementation until 2.5 release
Posted by sb...@apache.org.
removing validation, as jackson does not contain a default implementation until 2.5 release
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ad5f90cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ad5f90cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ad5f90cc
Branch: refs/heads/master
Commit: ad5f90cc115c40e0ef480a85957bbb4c98157cf6
Parents: 9a57532
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 20:21:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 20:21:22 2014 -0500
----------------------------------------------------------------------
streams-components/streams-http/pom.xml | 6 ------
.../components/http/processor/SimpleHTTPGetProcessor.java | 6 ------
.../components/http/provider/SimpleHTTPGetProvider.java | 3 ---
3 files changed, 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
index 9c2b079..39a4faa 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -38,12 +38,6 @@
<artifactId>jsonschema2pojo-core</artifactId>
<type>jar</type>
<scope>compile</scope>
- <exclusions>
- <exclusion>
- <groupId>javax.validation</groupId>
- <artifactId>validation-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index c2bfef6..0d17cc6 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -3,7 +3,6 @@ package org.apache.streams.components.http.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -25,8 +24,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.Validation;
-import javax.validation.ValidatorFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -199,9 +196,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
@Override
public void prepare(Object configurationObject) {
- ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
- Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
mapper = StreamsJacksonMapper.getInstance();
uriBuilder = new URIBuilder()
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
index 622225a..36084c7 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -89,9 +89,6 @@ public class SimpleHTTPGetProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
-// ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-// Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
mapper = StreamsJacksonMapper.getInstance();
uriBuilder = new URIBuilder()
[11/12] incubator-streams git commit: adds arbitrary Joda format
support to StreamsJacksonMapper
Posted by sb...@apache.org.
adds arbitrary Joda format support to StreamsJacksonMapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e83659c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e83659c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e83659c5
Branch: refs/heads/master
Commit: e83659c51138b285dfa738739a765c7a222890c5
Parents: dbd7d89
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 13:58:11 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:47:21 2014 -0800
----------------------------------------------------------------------
.../jackson/StreamsDateTimeDeserializer.java | 23 +++++++++++++++++++-
.../streams/jackson/StreamsJacksonMapper.java | 20 +++++++++++++++++
.../streams/jackson/StreamsJacksonModule.java | 9 ++++++++
3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index e0b98b2..8f53954 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -21,23 +21,44 @@ package org.apache.streams.jackson;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.google.common.collect.Lists;
import org.apache.streams.data.util.RFC3339Utils;
import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
/**
* Created by sblackmon on 3/27/14.
*/
public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable {
+ List<DateTimeFormatter> formatters = Lists.newArrayList();
+
protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass) {
super(dateTimeClass);
}
+ protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass, List<String> formats) {
+ super(dateTimeClass);
+ for( String format : formats )
+ formatters.add(DateTimeFormat.forPattern(format));
+ }
+
@Override
public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
- return RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+
+ DateTime result = RFC3339Utils.parseToUTC(jpar.getValueAsString());
+ Iterator<DateTimeFormatter> iterator = formatters.iterator();
+ while( result == null && iterator.hasNext()) {
+ DateTimeFormatter formatter = iterator.next();
+ result = formatter.parseDateTime(jpar.getValueAsString());
+ }
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index 25c0c89..8a74caa 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import java.util.List;
+
/**
* Created by sblackmon on 3/27/14.
*/
@@ -36,9 +38,27 @@ public class StreamsJacksonMapper extends ObjectMapper {
return INSTANCE;
}
+ public static StreamsJacksonMapper getInstance(List<String> formats){
+
+ StreamsJacksonMapper instance = new StreamsJacksonMapper(formats);
+
+ return instance;
+
+ }
+
public StreamsJacksonMapper() {
super();
registerModule(new StreamsJacksonModule());
+ configure();
+ }
+
+ public StreamsJacksonMapper(List<String> formats) {
+ super();
+ registerModule(new StreamsJacksonModule(formats));
+ configure();
+ }
+
+ public void configure() {
disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 2869414..8b44b0f 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import org.joda.time.DateTime;
import org.joda.time.Period;
+import java.util.List;
+
/**
* Created by sblackmon on 3/27/14.
*/
@@ -36,5 +38,12 @@ public class StreamsJacksonModule extends SimpleModule {
addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
}
+ public StreamsJacksonModule(List<String> formats) {
+ super();
+ addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
+ addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class, formats));
+ addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
+ addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
+ }
}
[09/12] incubator-streams git commit: added license header and
javadoc comments
Posted by sb...@apache.org.
added license header and javadoc comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d2ffe62b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d2ffe62b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d2ffe62b
Branch: refs/heads/master
Commit: d2ffe62baf88618c7c8a6b9a1e5a79d21ed9e3fd
Parents: 0c8e67c
Author: sblackmon <sb...@apache.org>
Authored: Mon Oct 13 17:07:19 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Oct 13 17:07:19 2014 -0500
----------------------------------------------------------------------
.../components/http/HttpConfigurator.java | 2 +-
.../http/processor/SimpleHTTPGetProcessor.java | 18 +++++++++++++++++
.../http/provider/SimpleHTTPGetProvider.java | 18 +++++++++++++++++
.../peoplepattern/AccountTypeProcessor.java | 2 +-
.../processor/TwitterUrlApiProcessor.java | 20 ++++++++++++++++++-
.../apache/streams/data/util/ExtensionUtil.java | 21 ++++++++++++++++++++
6 files changed, 78 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
index 900831f..979a680 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ * Converts a {@link com.typesafe.config.Config} element into an instance of HttpConfiguration
*/
public class HttpConfigurator {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index d3d4429..b8c957c 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.components.http.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
index 36084c7..118d06b 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.components.http.provider;
import com.fasterxml.jackson.databind.JsonNode;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index edcf4d3..4e4a6af 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Enrich actor with demographics
+ * Enrich actor with account type
*/
public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 54e1369..17ce411 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.twitter.processor;
import com.google.common.base.Preconditions;
@@ -13,7 +31,7 @@ import java.util.List;
import java.util.Map;
/**
- * Created by sblackmon on 9/14/14.
+ * Class gets a global share count from Twitter API for links on Activity datums
*/
public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index 7ce013c..1e0e384 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -7,6 +25,9 @@ import org.apache.streams.pojo.json.ActivityObject;
import java.util.Map;
+/**
+ * Class makes it easier to manage extensions added to activities, actors, objects, etc...
+ */
public class ExtensionUtil {
/**
[06/12] incubator-streams git commit: committing PeoplePattern
processor
Posted by sb...@apache.org.
committing PeoplePattern processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a575322
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a575322
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a575322
Branch: refs/heads/master
Commit: 9a575322231e4a8a69c56f06da743ffe3211ccb4
Parents: d62061d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 20:20:52 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 20:20:52 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 5 +-
.../streams-processor-peoplepattern/pom.xml | 138 +++++++++++++++++++
.../peoplepattern/AccountTypeProcessor.java | 76 ++++++++++
.../peoplepattern/DemographicsProcessor.java | 77 +++++++++++
.../streams/peoplepattern/AccountType.json | 27 ++++
.../streams/peoplepattern/Demographics.json | 60 ++++++++
.../resources/templates/peoplepatternactor.json | 25 ++++
7 files changed, 406 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index e290466..fcec297 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,17 +44,18 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
- <module>streams-amazon-aws</module>
+ <module>streams-amazon-aws</module>
<!--<module>streams-processor-lucene</module>-->
<!--<module>streams-processor-tika</module>-->
- <module>streams-provider-instagram</module>
<module>streams-processor-jackson</module>
<module>streams-processor-json</module>
<module>streams-processor-urls</module>
+ <module>streams-processor-peoplepattern</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
<module>streams-provider-google</module>
<module>streams-provider-gnip</module>
+ <module>streams-provider-instagram</module>
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
<module>streams-provider-sysomos</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml
new file mode 100644
index 0000000..b810200
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>streams-processor-peoplepattern</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.peoplepattern</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
new file mode 100644
index 0000000..d180b7f
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
+
+ private final static String STREAMS_ID = "AccountTypeProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class);
+
+ public AccountTypeProcessor() {
+ this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+ }
+
+ public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+ super(peoplePatternConfiguration);
+ LOGGER.info("creating AccountTypeProcessor");
+ configuration.setProtocol("https");
+ configuration.setHostname("api.peoplepattern.com");
+ configuration.setResourcePath("/v0.2/account_type/");
+ configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+ configuration.setExtension("account_type");
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+ Actor actor = activity.getActor();
+ ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+ String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("id", actor.getId());
+ params.put("name", actor.getDisplayName());
+ params.put("username", username);
+ params.put("description", actor.getSummary());
+ return params;
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
new file mode 100644
index 0000000..6ffbb9b
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class DemographicsProcessor extends SimpleHTTPGetProcessor {
+
+ public final static String STREAMS_ID = "DemographicsProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class);
+
+ public DemographicsProcessor() {
+ this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+ }
+
+ public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+ super(peoplePatternConfiguration);
+ LOGGER.info("creating DemographicsProcessor");
+ configuration.setProtocol("https");
+ configuration.setHostname("api.peoplepattern.com");
+ configuration.setResourcePath("/v0.2/demographics/");
+ configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+ configuration.setExtension("demographics");
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+ Actor actor = activity.getActor();
+ ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+ String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("id", actor.getId());
+ params.put("name", actor.getDisplayName());
+ params.put("username", username);
+ params.put("description", actor.getSummary());
+ return params;
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
new file mode 100644
index 0000000..5656b44
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
@@ -0,0 +1,27 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.AccountType",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "prediction" : {
+ "type" : "string",
+ "enum" : [
+ "person",
+ "organization",
+ "entertainment",
+ "adult",
+ "spam",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ },
+ "id": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
new file mode 100644
index 0000000..d1f64d8
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
@@ -0,0 +1,60 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.Demographics",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "age": {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "integer",
+ "default": 1990
+ },
+ "score": {
+ "type": "number"
+ }
+
+ }
+ },
+ "gender" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "male",
+ "female",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ },
+ "race" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "black",
+ "east-asian",
+ "hispanic",
+ "middle-eastern",
+ "south-asian",
+ "white",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
new file mode 100644
index 0000000..9a24c5c
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
@@ -0,0 +1,25 @@
+{
+ "order": 20,
+ "template": "*activity*",
+ "settings": {},
+ "mappings": {
+ "activity": {
+ "properties": {
+ "actor": {
+ "properties": {
+ "extensions": {
+ "properties": {
+ "account_type": {
+ "type": "nested"
+ },
+ "demographics": {
+ "type": "nested"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
[12/12] incubator-streams git commit: Merge branch 'STREAMS-168'
Posted by sb...@apache.org.
Merge branch 'STREAMS-168'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab9c6963
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab9c6963
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab9c6963
Branch: refs/heads/master
Commit: ab9c6963d8d3524ab88703a73a6c0e05842a5fee
Parents: e83659c d2ffe62
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 7 10:48:50 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:48:50 2014 -0800
----------------------------------------------------------------------
pom.xml | 2 +
streams-components/pom.xml | 62 +++++
streams-components/streams-http/README.md | 16 ++
streams-components/streams-http/pom.xml | 153 +++++++++++
.../components/http/HttpConfigurator.java | 62 +++++
.../http/processor/SimpleHTTPGetProcessor.java | 268 +++++++++++++++++++
.../http/provider/SimpleHTTPGetProvider.java | 230 ++++++++++++++++
.../components/http/HttpConfiguration.json | 50 ++++
.../http/HttpProcessorConfiguration.json | 28 ++
.../http/HttpProviderConfiguration.json | 18 ++
streams-contrib/pom.xml | 5 +-
.../streams-processor-peoplepattern/pom.xml | 138 ++++++++++
.../peoplepattern/AccountTypeProcessor.java | 75 ++++++
.../peoplepattern/DemographicsProcessor.java | 76 ++++++
.../streams/peoplepattern/AccountType.json | 27 ++
.../streams/peoplepattern/Demographics.json | 60 +++++
.../resources/templates/peoplepatternactor.json | 25 ++
.../api/FacebookPostActivitySerializer.java | 1 -
.../streams-provider-twitter/pom.xml | 5 +
.../processor/TwitterUrlApiProcessor.java | 73 +++++
.../provider/TwitterTimelineProvider.java | 7 +-
streams-pojo-extensions/pom.xml | 64 +++++
.../apache/streams/data/util/ExtensionUtil.java | 108 ++++++++
.../apache/streams/data/util/ActivityUtil.java | 14 +-
.../org/apache/streams/pojo/json/activity.json | 3 +-
.../org/apache/streams/pojo/json/object.json | 2 +-
26 files changed, 1559 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
[04/12] incubator-streams git commit: class need not be abstract
rootDocument is what should exit processor
Posted by sb...@apache.org.
class need not be abstract
rootDocument is what should exit processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5764609
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5764609
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5764609
Branch: refs/heads/master
Commit: a576460911464cf260e37886ce840f4e57dd5f30
Parents: b8ccf9f
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 14:24:53 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 14:24:53 2014 -0500
----------------------------------------------------------------------
.../apache/streams/components/http/SimpleHTTPGetProcessor.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5764609/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
index dec9d03..d74793a 100644
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -40,7 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
+public class SimpleHTTPGetProcessor implements StreamsProcessor {
private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
@@ -152,7 +152,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
try {
response = httpclient.execute(httpget);
HttpEntity entity = response.getEntity();
- // TODO: handle rate-limiting
+ // TODO: handle retry
if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
entityString = EntityUtils.toString(entity);
}
@@ -181,7 +181,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
- entry.setDocument(extensionEntity);
+ entry.setDocument(rootDocument);
result.add(entry);
[05/12] incubator-streams git commit: introduced a
SimpleHTTPGetProvider reorganized configuration added access_token support
added http basic auth support
Posted by sb...@apache.org.
introduced a SimpleHTTPGetProvider
reorganized configuration
added access_token support
added http basic auth support
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d62061de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d62061de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d62061de
Branch: refs/heads/master
Commit: d62061dec8628484e66d3494e02f1f65efafda41
Parents: a576460
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 17:49:01 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 17:49:01 2014 -0500
----------------------------------------------------------------------
streams-components/pom.xml | 2 +-
streams-components/streams-http/README.md | 16 ++
streams-components/streams-http/pom.xml | 159 +++++++++++++
.../components/http/HttpConfigurator.java | 62 +++++
.../http/processor/SimpleHTTPGetProcessor.java | 230 +++++++++++++++++++
.../http/provider/SimpleHTTPGetProvider.java | 215 +++++++++++++++++
.../components/http/HttpConfiguration.json | 50 ++++
.../http/HttpProcessorConfiguration.json | 28 +++
.../http/HttpProviderConfiguration.json | 18 ++
.../streams-processor-http/README.md | 16 --
.../streams-processor-http/pom.xml | 154 -------------
.../components/http/HttpConfigurator.java | 53 -----
.../components/http/SimpleHTTPGetProcessor.java | 218 ------------------
.../HttpProcessorConfiguration.json | 47 ----
14 files changed, 779 insertions(+), 489 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
index 26384b1..9942e14 100644
--- a/streams-components/pom.xml
+++ b/streams-components/pom.xml
@@ -37,7 +37,7 @@
</properties>
<modules>
- <module>streams-processor-http</module>
+ <module>streams-http</module>
</modules>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/README.md b/streams-components/streams-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+ "http": {
+ "protocol": "http",
+ "hostname": "urls.api.twitter.com",
+ "port": 9300,
+ "resourceUri": "1/urls/count.json"
+ }
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
new file mode 100644
index 0000000..9c2b079
--- /dev/null
+++ b/streams-components/streams-http/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-components</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>streams-http</artifactId>
+
+ <name>streams-http</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.http</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ <includeJsr303Annotations>true</includeJsr303Annotations>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..900831f
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
+
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+ public static HttpProviderConfiguration detectProviderConfiguration(Config config) {
+
+ HttpProviderConfiguration httpProviderConfiguration = null;
+
+ try {
+ httpProviderConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProviderConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse http configuration", e.getMessage());
+ }
+ return httpProviderConfiguration;
+ }
+
+ public static HttpProcessorConfiguration detectProcessorConfiguration(Config config) {
+
+ HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+ try {
+ httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse http configuration", e.getMessage());
+ }
+ return httpProcessorConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..c2bfef6
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -0,0 +1,230 @@
+package org.apache.streams.components.http.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor retrieves contents from an known url and stores the resulting object in an extension field
+ */
+public class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+ // from root config id
+ private final static String EXTENSION = "account_type";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+ protected ObjectMapper mapper;
+
+ protected URIBuilder uriBuilder;
+
+ protected CloseableHttpClient httpclient;
+
+ protected HttpProcessorConfiguration configuration;
+
+ protected String authHeader;
+//
+// // authorized only
+// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+// //private String authHeader;
+//
+ public SimpleHTTPGetProcessor() {
+ this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
+ }
+
+ public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
+ LOGGER.info("creating SimpleHTTPGetProcessor");
+ LOGGER.info(processorConfiguration.toString());
+ this.configuration = processorConfiguration;
+ }
+
+
+ /**
+ Override this to store a result other than exact json representation of response
+ */
+ protected ObjectNode prepareExtensionFragment(String entityString) {
+
+ try {
+ return mapper.readValue(entityString, ObjectNode.class);
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+ try {
+ String json = datum.getDocument() instanceof String ?
+ (String) datum.getDocument() :
+ mapper.writeValueAsString(datum.getDocument());
+ return mapper.readValue(json, ObjectNode.class);
+ } catch (JsonProcessingException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+
+ }
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+ if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+ return rootDocument;
+ else
+ return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ ObjectNode rootDocument = getRootDocument(entry);
+
+ Map<String, String> params = prepareParams(entry);
+
+ URI uri;
+ for( Map.Entry<String,String> param : params.entrySet()) {
+ uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+ }
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uriBuilder.toString());
+ return result;
+ }
+
+ HttpGet httpget = prepareHttpGet(uri);
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle retry
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+ return result;
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ try {
+ httpclient.close();
+ } catch (IOException e) {}
+ }
+
+ if( entityString == null )
+ return result;
+
+ LOGGER.debug(entityString);
+
+ ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+ ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+ ExtensionUtil.ensureExtensions(extensionEntity);
+
+ ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+
+ entry.setDocument(rootDocument);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ return Maps.newHashMap();
+ }
+
+
+ public HttpGet prepareHttpGet(URI uri) {
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ if( !Strings.isNullOrEmpty(authHeader))
+ httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+ return httpget;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+ Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+ mapper = StreamsJacksonMapper.getInstance();
+
+ uriBuilder = new URIBuilder()
+ .setScheme(this.configuration.getProtocol())
+ .setHost(this.configuration.getHostname())
+ .setPath(this.configuration.getResourcePath());
+
+ if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
+ uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+ if( !Strings.isNullOrEmpty(configuration.getUsername())
+ && !Strings.isNullOrEmpty(configuration.getPassword())) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(configuration.getUsername());
+ stringBuilder.append(":");
+ stringBuilder.append(configuration.getPassword());
+ String string = stringBuilder.toString();
+ authHeader = Base64.encodeBase64String(string.getBytes());
+ }
+ httpclient = HttpClients.createDefault();
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("shutting down SimpleHTTPGetProcessor");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
new file mode 100644
index 0000000..622225a
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -0,0 +1,215 @@
+package org.apache.streams.components.http.provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Provider retrieves contents from an known set of urls and passes all resulting objects downstream
+ */
+public class SimpleHTTPGetProvider implements StreamsProvider {
+
+ private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+ // from root config id
+ private final static String EXTENSION = "account_type";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProvider.class);
+
+ protected ObjectMapper mapper;
+
+ protected URIBuilder uriBuilder;
+
+ protected CloseableHttpClient httpclient;
+
+ protected HttpProviderConfiguration configuration;
+
+ protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ // // authorized only
+// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+// //private String authHeader;
+//
+ public SimpleHTTPGetProvider() {
+ this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
+ }
+
+ public SimpleHTTPGetProvider(HttpProviderConfiguration providerConfiguration) {
+ LOGGER.info("creating SimpleHTTPGetProvider");
+ LOGGER.info(providerConfiguration.toString());
+ this.configuration = providerConfiguration;
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ return Maps.newHashMap();
+ }
+
+ public HttpGet prepareHttpGet(URI uri) {
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ return httpget;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+// ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+// Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+ mapper = StreamsJacksonMapper.getInstance();
+
+ uriBuilder = new URIBuilder()
+ .setScheme(this.configuration.getProtocol())
+ .setHost(this.configuration.getHostname())
+ .setPath(this.configuration.getResourcePath());
+
+ httpclient = HttpClients.createDefault();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ LOGGER.info("shutting down SimpleHTTPGetProvider");
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ httpclient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ httpclient = null;
+ }
+ }
+ }
+
+ @Override
+ public void startStream() {
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ StreamsResultSet current;
+
+ uriBuilder = uriBuilder.setPath(
+ Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix())
+ );
+
+ URI uri;
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ uri = null;
+ }
+
+ List<ObjectNode> results = executeGet(uri);
+
+ lock.writeLock().lock();
+
+ for( ObjectNode item : results ) {
+ providerQueue.add(new StreamsDatum(item, item.get("id").asText(), new DateTime(item.get("timestamp").asText())));
+ }
+
+ LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+ current = new StreamsResultSet(providerQueue);
+
+ return current;
+ }
+
+ protected List<ObjectNode> executeGet(URI uri) {
+
+ Preconditions.checkNotNull(uri);
+
+ List<ObjectNode> results = new ArrayList<>();
+
+ HttpGet httpget = prepareHttpGet(uri);
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle retry
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ if( !entityString.equals("{}") && !entityString.equals("[]") ) {
+ JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class);
+ if (jsonNode != null && jsonNode instanceof ObjectNode ) {
+
+ results.add((ObjectNode) jsonNode);
+ } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ Iterator<JsonNode> iterator = arrayNode.elements();
+ while (iterator.hasNext()) {
+ ObjectNode element = (ObjectNode) iterator.next();
+
+ results.add(element);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ }
+ return results;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
new file mode 100644
index 0000000..b4dc243
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
@@ -0,0 +1,50 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.components.http.HttpConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "protocol": {
+ "type": "string",
+ "description": "Protocol",
+ "default": "http"
+ },
+ "hostname": {
+ "type": "string",
+ "description": "Hostname",
+ "required" : true
+ },
+ "port": {
+ "type": "integer",
+ "description": "Port",
+ "default": 80
+ },
+ "resourcePath": {
+ "type": "string",
+ "description": "Resource Path",
+ "required" : true
+ },
+ "content-type": {
+ "type": "string",
+ "description": "Resource content-type",
+ "required" : true,
+ "default": "application/json"
+ },
+ "access_token": {
+ "type": "string",
+ "description": "Known Access Token",
+ "required" : false
+ },
+ "username": {
+ "type": "string",
+ "description": "Basic Auth Username",
+ "required" : false
+ },
+ "password": {
+ "type": "string",
+ "description": "Basic Auth Password",
+ "required" : false
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..32e4c23
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
@@ -0,0 +1,28 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": { "$ref": "HttpConfiguration.json" },
+ "properties": {
+ "entity": {
+ "type": "string",
+ "description": "Entity to extend",
+ "enum": [ "activity", "actor", "object", "target" ],
+ "required" : true,
+ "default": "activity"
+ },
+ "extension": {
+ "type": "string",
+ "description": "Extension identifier",
+ "required" : true
+ },
+ "urlField": {
+ "type": "string",
+ "description": "Field where url is located",
+ "required" : true,
+ "default": "url"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
new file mode 100644
index 0000000..2c135d9
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
@@ -0,0 +1,18 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.components.http.HttpProviderConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": { "$ref": "HttpConfiguration.json" },
+ "properties": {
+ "resource": {
+ "type": "string",
+ "required" : false
+ },
+ "resourcePostfix": {
+ "type": "string",
+ "required" : false
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md
deleted file mode 100644
index 62dd4c1..0000000
--- a/streams-components/streams-processor-http/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-streams-processor-http
-=====================
-
-Hit an http endpoint and place the result in extensions
-
-Example SimpleHTTPGetProcessor configuration:
-
- "http": {
- "protocol": "http",
- "hostname": "urls.api.twitter.com",
- "port": 9300,
- "resourceUri": "1/urls/count.json"
- }
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
deleted file mode 100644
index d9215ad..0000000
--- a/streams-components/streams-processor-http/pom.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-components</artifactId>
- <version>0.1-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>streams-processor-http</artifactId>
-
- <name>streams-processor-http</name>
-
- <dependencies>
-
- <dependency>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-core</artifactId>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-config</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-pojo</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-pojo-extensions</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.3.5</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
-
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/jsonschema2pojo</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-source-jaxb2</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/jaxb2</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-maven-plugin</artifactId>
- <configuration>
- <addCompileSourceRoot>true</addCompileSourceRoot>
- <generateBuilders>true</generateBuilders>
- <sourcePaths>
- <sourcePath>src/main/jsonschema</sourcePath>
- </sourcePaths>
- <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
- <targetPackage>org.apache.streams.http</targetPackage>
- <useLongIntegers>true</useLongIntegers>
- <useJodaDates>true</useJodaDates>
- <includeJsr303Annotations>true</includeJsr303Annotations>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
deleted file mode 100644
index 36801b8..0000000
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
- */
-public class HttpConfigurator {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
-
- private final static ObjectMapper mapper = new ObjectMapper();
-
- public static HttpProcessorConfiguration detectConfiguration(Config config) {
-
- HttpProcessorConfiguration httpProcessorConfiguration = null;
-
- try {
- httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn("Could not parse http configuration", e.getMessage());
- }
- return httpProcessorConfiguration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
deleted file mode 100644
index d74793a..0000000
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ /dev/null
@@ -1,218 +0,0 @@
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.data.util.ExtensionUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.validation.Validation;
-import javax.validation.ValidatorFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class SimpleHTTPGetProcessor implements StreamsProcessor {
-
- private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
-
- // from root config id
- private final static String EXTENSION = "account_type";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
-
- protected ObjectMapper mapper;
-
- protected URIBuilder uriBuilder;
-
- protected CloseableHttpClient httpclient;
-
- protected HttpProcessorConfiguration configuration;
-//
-// // authorized only
-// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
-// //private String authHeader;
-//
- public SimpleHTTPGetProcessor() {
- LOGGER.info("creating SimpleHTTPGetProcessor");
- this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
- }
-
- public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
- LOGGER.info("creating SimpleHTTPGetProcessor");
- LOGGER.info(processorConfiguration.toString());
- this.configuration = processorConfiguration;
- }
-
- /**
- Override this to add parameters to the request
- */
- protected Map<String, String> prepareParams(StreamsDatum entry) {
-
- return Maps.newHashMap();
- }
-
- /**
- Override this to store a result other than exact json representation of response
- */
- protected ObjectNode prepareExtensionFragment(String entityString) {
-
- try {
- return mapper.readValue(entityString, ObjectNode.class);
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- return null;
- }
- }
-
- /**
- Override this to place result in non-standard location on document
- */
- protected ObjectNode getRootDocument(StreamsDatum datum) {
-
- try {
- String json = datum.getDocument() instanceof String ?
- (String) datum.getDocument() :
- mapper.writeValueAsString(datum.getDocument());
- return mapper.readValue(json, ObjectNode.class);
- } catch (JsonProcessingException e) {
- LOGGER.warn(e.getMessage());
- return null;
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- return null;
- }
-
- }
- /**
- Override this to place result in non-standard location on document
- */
- protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
-
- if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
- return rootDocument;
- else
- return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
-
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
-
- ObjectNode rootDocument = getRootDocument(entry);
-
- Map<String, String> params = prepareParams(entry);
-
- URI uri;
- for( Map.Entry<String,String> param : params.entrySet()) {
- uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
- }
- try {
- uri = uriBuilder.build();
- } catch (URISyntaxException e) {
- LOGGER.error("URI error {}", uriBuilder.toString());
- return result;
- }
-
- HttpGet httpget = prepareHttpGet(uri);
-
- CloseableHttpResponse response = null;
-
- String entityString = null;
- try {
- response = httpclient.execute(httpget);
- HttpEntity entity = response.getEntity();
- // TODO: handle retry
- if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
- entityString = EntityUtils.toString(entity);
- }
- } catch (IOException e) {
- LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
- return result;
- } finally {
- try {
- response.close();
- } catch (IOException e) {}
- try {
- httpclient.close();
- } catch (IOException e) {}
- }
-
- if( entityString == null )
- return result;
-
- LOGGER.debug(entityString);
-
- ObjectNode extensionFragment = prepareExtensionFragment(entityString);
-
- ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
- ExtensionUtil.ensureExtensions(extensionEntity);
-
- ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
-
- entry.setDocument(rootDocument);
-
- result.add(entry);
-
- return result;
-
- }
-
- public HttpGet prepareHttpGet(URI uri) {
- HttpGet httpget = new HttpGet(uri);
- httpget.addHeader("content-type", this.configuration.getContentType());
- return httpget;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
- Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
- mapper = StreamsJacksonMapper.getInstance();
-
- uriBuilder = new URIBuilder()
- .setScheme(this.configuration.getProtocol())
- .setHost(this.configuration.getHostname())
- .setPath(this.configuration.getResourceUri());
-
- httpclient = HttpClients.createDefault();
- }
-
- @Override
- public void cleanUp() {
- LOGGER.info("shutting down SimpleHTTPGetProcessor");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
deleted file mode 100644
index 40c3bcd..0000000
--- a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
+++ /dev/null
@@ -1,47 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "id": "#",
- "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "protocol": {
- "type": "string",
- "description": "Protocol",
- "default": "http"
- },
- "hostname": {
- "type": "string",
- "description": "Hostname",
- "required" : true
- },
- "port": {
- "type": "integer",
- "description": "Port",
- "default": 80
- },
- "resourceUri": {
- "type": "string",
- "description": "Resource URI",
- "required" : true
- },
- "content-type": {
- "type": "string",
- "description": "Resource URI",
- "required" : true,
- "default": "application/json"
- },
- "entity": {
- "type": "string",
- "description": "Entity to extend",
- "enum": [ "activity", "actor", "object", "target" ],
- "required" : true,
- "default": "activity"
- },
- "extension": {
- "type": "string",
- "description": "Extension identifier",
- "required" : true
- }
- }
-}
\ No newline at end of file
[03/12] incubator-streams git commit: added HttpConfigurator
Posted by sb...@apache.org.
added HttpConfigurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8ccf9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8ccf9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8ccf9f6
Branch: refs/heads/master
Commit: b8ccf9f6869c64f1f6e25f668942ea0de9fec2eb
Parents: 34232ad
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 18:38:09 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 18:38:09 2014 -0500
----------------------------------------------------------------------
.../components/http/HttpConfigurator.java | 53 ++++++++++++++++++++
.../components/http/SimpleHTTPGetProcessor.java | 23 +++++----
.../processor/TwitterUrlApiProcessor.java | 8 +++
3 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..36801b8
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
+
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+ public static HttpProcessorConfiguration detectConfiguration(Config config) {
+
+ HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+ try {
+ httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse http configuration", e.getMessage());
+ }
+ return httpProcessorConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
index d76d839..dec9d03 100644
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -19,6 +19,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.ActivityUtil;
@@ -34,6 +35,7 @@ import javax.validation.ValidatorFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -59,6 +61,11 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
// //private String authHeader;
//
+ public SimpleHTTPGetProcessor() {
+ LOGGER.info("creating SimpleHTTPGetProcessor");
+ this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
+ }
+
public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
LOGGER.info("creating SimpleHTTPGetProcessor");
LOGGER.info(processorConfiguration.toString());
@@ -137,9 +144,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
return result;
}
- HttpGet httpget = new HttpGet(uri);
- httpget.addHeader("content-type", this.configuration.getContentType());
- //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+ HttpGet httpget = prepareHttpGet(uri);
CloseableHttpResponse response = null;
@@ -184,6 +189,12 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
}
+ public HttpGet prepareHttpGet(URI uri) {
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ return httpget;
+ }
+
@Override
public void prepare(Object configurationObject) {
@@ -198,12 +209,6 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
.setPath(this.configuration.getResourceUri());
httpclient = HttpClients.createDefault();
- // StringBuilder stringBuilder = new StringBuilder();
-// stringBuilder.append(peoplePatternConfiguration.getUsername());
-// stringBuilder.append(":");
-// stringBuilder.append(peoplePatternConfiguration.getPassword());
-// String string = stringBuilder.toString();
-// authHeader = Base64.encodeBase64String(string.getBytes());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 77965c4..438937f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -5,6 +5,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpProcessorConfiguration;
import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
@@ -17,6 +18,13 @@ import java.util.Map;
*/
public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
+ public TwitterUrlApiProcessor() {
+ super();
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setExtension("twitter_url_count");
+ }
+
public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
super(processorConfiguration);
this.configuration.setHostname("urls.api.twitter.com");
[02/12] incubator-streams git commit: added streams-components module
added streams-processor-http module activated streams-pojo-extensions module
tweaks to pojo, require id and remove non-sensical defaults
Posted by sb...@apache.org.
added streams-components module
added streams-processor-http module
activated streams-pojo-extensions module
tweaks to pojo, require id and remove non-sensical defaults
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34232ad8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34232ad8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34232ad8
Branch: refs/heads/master
Commit: 34232ad87b50253913fba0dabb6fa9af591f388d
Parents: 1464819
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 13:50:32 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 13:50:32 2014 -0500
----------------------------------------------------------------------
pom.xml | 2 +
.../streams-processor-http/README.md | 16 ++
.../streams-processor-http/pom.xml | 42 +++-
.../components/http/SimpleHTTPGetProcessor.java | 213 +++++++++++++++++++
.../HttpProcessorConfiguration.json | 47 ++++
.../api/FacebookPostActivitySerializer.java | 1 -
.../streams-provider-twitter/pom.xml | 5 +
.../processor/TwitterUrlApiProcessor.java | 44 ++++
streams-pojo-extensions/pom.xml | 64 ++++++
.../apache/streams/data/util/ExtensionUtil.java | 94 ++++++++
.../apache/streams/data/util/ActivityUtil.java | 14 +-
.../org/apache/streams/pojo/json/activity.json | 3 +-
.../org/apache/streams/pojo/json/object.json | 2 +-
13 files changed, 537 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c2e1766..59fa634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,9 @@
<module>streams-osgi-components</module>
<module>streams-core</module>
<module>streams-config</module>
+ <module>streams-components</module>
<module>streams-pojo</module>
+ <module>streams-pojo-extensions</module>
<module>streams-util</module>
<module>streams-contrib</module>
<module>streams-runtimes</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-processor-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+ "http": {
+ "protocol": "http",
+ "hostname": "urls.api.twitter.com",
+ "port": 9300,
+ "resourceUri": "1/urls/count.json"
+ }
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
index 67538aa..d9215ad 100644
--- a/streams-components/streams-processor-http/pom.xml
+++ b/streams-components/streams-processor-http/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-project</artifactId>
+ <artifactId>streams-components</artifactId>
<version>0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -40,6 +40,45 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
</dependencies>
<build>
@@ -100,6 +139,7 @@
<targetPackage>org.apache.streams.http</targetPackage>
<useLongIntegers>true</useLongIntegers>
<useJodaDates>true</useJodaDates>
+ <includeJsr303Annotations>true</includeJsr303Annotations>
</configuration>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..d76d839
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -0,0 +1,213 @@
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+ // from root config id
+ private final static String EXTENSION = "account_type";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+ protected ObjectMapper mapper;
+
+ protected URIBuilder uriBuilder;
+
+ protected CloseableHttpClient httpclient;
+
+ protected HttpProcessorConfiguration configuration;
+//
+// // authorized only
+// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+// //private String authHeader;
+//
+ public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
+ LOGGER.info("creating SimpleHTTPGetProcessor");
+ LOGGER.info(processorConfiguration.toString());
+ this.configuration = processorConfiguration;
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ return Maps.newHashMap();
+ }
+
+ /**
+ Override this to store a result other than exact json representation of response
+ */
+ protected ObjectNode prepareExtensionFragment(String entityString) {
+
+ try {
+ return mapper.readValue(entityString, ObjectNode.class);
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+ try {
+ String json = datum.getDocument() instanceof String ?
+ (String) datum.getDocument() :
+ mapper.writeValueAsString(datum.getDocument());
+ return mapper.readValue(json, ObjectNode.class);
+ } catch (JsonProcessingException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+
+ }
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+ if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+ return rootDocument;
+ else
+ return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ ObjectNode rootDocument = getRootDocument(entry);
+
+ Map<String, String> params = prepareParams(entry);
+
+ URI uri;
+ for( Map.Entry<String,String> param : params.entrySet()) {
+ uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+ }
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uriBuilder.toString());
+ return result;
+ }
+
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle rate-limiting
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+ return result;
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ try {
+ httpclient.close();
+ } catch (IOException e) {}
+ }
+
+ if( entityString == null )
+ return result;
+
+ LOGGER.debug(entityString);
+
+ ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+ ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+ ExtensionUtil.ensureExtensions(extensionEntity);
+
+ ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+
+ entry.setDocument(extensionEntity);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+ Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+ mapper = StreamsJacksonMapper.getInstance();
+
+ uriBuilder = new URIBuilder()
+ .setScheme(this.configuration.getProtocol())
+ .setHost(this.configuration.getHostname())
+ .setPath(this.configuration.getResourceUri());
+
+ httpclient = HttpClients.createDefault();
+ // StringBuilder stringBuilder = new StringBuilder();
+// stringBuilder.append(peoplePatternConfiguration.getUsername());
+// stringBuilder.append(":");
+// stringBuilder.append(peoplePatternConfiguration.getPassword());
+// String string = stringBuilder.toString();
+// authHeader = Base64.encodeBase64String(string.getBytes());
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("shutting down SimpleHTTPGetProcessor");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..40c3bcd
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
@@ -0,0 +1,47 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "protocol": {
+ "type": "string",
+ "description": "Protocol",
+ "default": "http"
+ },
+ "hostname": {
+ "type": "string",
+ "description": "Hostname",
+ "required" : true
+ },
+ "port": {
+ "type": "integer",
+ "description": "Port",
+ "default": 80
+ },
+ "resourceUri": {
+ "type": "string",
+ "description": "Resource URI",
+ "required" : true
+ },
+ "content-type": {
+ "type": "string",
+ "description": "Resource URI",
+ "required" : true,
+ "default": "application/json"
+ },
+ "entity": {
+ "type": "string",
+ "description": "Entity to extend",
+ "enum": [ "activity", "actor", "object", "target" ],
+ "required" : true,
+ "default": "activity"
+ },
+ "extension": {
+ "type": "string",
+ "description": "Extension identifier",
+ "required" : true
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index aa718fb..de39262 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -78,7 +78,6 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
setProvider(activity);
setObjectType(post.getType(), activity);
parseObject(activity, mapper.convertValue(post, ObjectNode.class));
- fixObjectId(activity);
fixContentFromSummary(activity);
activity.setVerb("post");
List<String> links = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 79d1608..f0d65f8 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -54,6 +54,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-processor-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
new file mode 100644
index 0000000..77965c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -0,0 +1,44 @@
+package org.apache.streams.twitter.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 9/14/14.
+ */
+public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
+
+ public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
+ super(processorConfiguration);
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setExtension("twitter_url_count");
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity = mapper.convertValue(entry, Activity.class);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
+ return super.process(entry);
+ }
+
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ Map<String, String> params = Maps.newHashMap();
+
+ params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+
+ return params;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/pom.xml b/streams-pojo-extensions/pom.xml
new file mode 100644
index 0000000..7f3f1a5
--- /dev/null
+++ b/streams-pojo-extensions/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-project</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>streams-pojo-extensions</artifactId>
+
+ <name>streams-pojo-extensions</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
new file mode 100644
index 0000000..a8d068a
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -0,0 +1,94 @@
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import java.util.Map;
+
+public class ExtensionUtil {
+
+ /**
+ * Property on the activity object to use for extensions
+ */
+ public static final String EXTENSION_PROPERTY = "extensions";
+ /**
+ * The number of +1, Like, favorites, etc that the post has received
+ */
+ public static final String LIKES_EXTENSION = "likes";
+ /**
+ * The number of retweets, shares, etc that the post has received
+ */
+ public static final String REBROADCAST_EXTENSION = "rebroadcasts";
+ /**
+ * The language of the post
+ */
+ public static final String LANGUAGE_EXTENSION = "language";
+ /**
+ * Location that the post was made or the actor's residence
+ */
+ public static final String LOCATION_EXTENSION = "location";
+ /**
+ * Country that the post was made
+ */
+ public static final String LOCATION_EXTENSION_COUNTRY = "country";
+ /**
+ * Specific JSON-geo coordinates (long,lat)
+ */
+ public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public static Map<String, Object> getExtensions(ObjectNode object) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ return extensions;
+ }
+
+ public static Object getExtension(ObjectNode object, String key) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ return extensions.get(key);
+ }
+
+ public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+ };
+
+ public static void addExtension(ObjectNode object, String key, Object extension) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ extensions.put(key, extension);
+ };
+
+ public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ for( Map.Entry<String, Object> item : extensions.entrySet())
+ activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+ };
+
+ public static void removeExtension(ObjectNode object, String key) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ extensions.remove(key);
+ };
+
+ /**
+ * Creates a standard extension property
+ * @param object objectnode to create the property in
+ * @return the Map representing the extensions property
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> ensureExtensions(ObjectNode object) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = Maps.newHashMap();
+ setExtensions(object, extensions);
+ }
+ return getExtensions(object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 3684b32..04ee923 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -19,6 +19,7 @@
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
@@ -61,7 +62,7 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
/**
* Creates a standard extension property
@@ -69,13 +70,14 @@ public class ActivityUtil {
* @return the Map representing the extensions property
*/
@SuppressWarnings("unchecked")
+ @Deprecated
public static Map<String, Object> ensureExtensions(Activity activity) {
- Map<String, Object> properties = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
- if(properties == null) {
- properties = new HashMap<String, Object>();
- activity.setAdditionalProperty(EXTENSION_PROPERTY, properties);
+ Map<String, Object> extensions = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = new HashMap<String, Object>();
+ activity.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
}
- return properties;
+ return extensions;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
index 45c2276..a68ce00 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
@@ -8,7 +8,8 @@
"properties": {
"id" :{
"type" : "string",
- "description" : "Uniquely identifies each activity within the service"
+ "description" : "Uniquely identifies each activity within the service",
+ "required" : true
},
"actor" : {
"type": "object",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
index d51db27..eec09a8 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
@@ -8,7 +8,7 @@
"id" : {
"type" : "string",
"description" : "Provides a permanent, universally unique identifier for the object in the form of an absolute IRI [RFC3987]. An object SHOULD contain a single id property. If an object does not contain an id property, consumers MAY use the value of the url property as a less-reliable, non-unique identifier.",
- "default" : "{link}"
+ "required" : true
},
"image" : {
"format":"image",