You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 20:11:32 UTC

[01/12] incubator-streams git commit: just pons

Repository: incubator-streams
Updated Branches:
  refs/heads/master b8e7a69d0 -> ab9c6963d


just pons


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1464819f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1464819f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1464819f

Branch: refs/heads/master
Commit: 1464819fd7ea147830a4211ea2ac6af4aa715022
Parents: 35a8fbf
Author: sblackmon <sb...@apache.org>
Authored: Sun Sep 14 10:24:47 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Sun Sep 14 10:24:47 2014 -0500

----------------------------------------------------------------------
 streams-components/pom.xml                      |  62 ++++++++++
 .../streams-processor-http/pom.xml              | 114 +++++++++++++++++++
 2 files changed, 176 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
new file mode 100644
index 0000000..26384b1
--- /dev/null
+++ b/streams-components/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-project</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-components</artifactId>
+
+    <packaging>pom</packaging>
+    <name>streams-components</name>
+
+    <properties>
+
+    </properties>
+
+    <modules>
+        <module>streams-processor-http</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-config</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-core</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streams</groupId>
+                <artifactId>streams-pojo</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
new file mode 100644
index 0000000..67538aa
--- /dev/null
+++ b/streams-components/streams-processor-http/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-project</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>streams-processor-http</artifactId>
+
+    <name>streams-processor-http</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>add-source-jaxb2</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jaxb2</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.http</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>


[08/12] incubator-streams git commit: a few package name changes working twitterurlapiprocess which uses streams-http (w/o authentication) working people pattern processors which use streams-http (w/ authentication)

Posted by sb...@apache.org.
a few package name changes
working twitterurlapiprocess which uses streams-http (w/o authentication)
working people pattern processors which use streams-http (w/ authentication)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0c8e67ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0c8e67ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0c8e67ce

Branch: refs/heads/master
Commit: 0c8e67ce26e131282aaaa03815ad6daefd684346
Parents: ad5f90c
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 23:13:29 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 23:13:29 2014 -0500

----------------------------------------------------------------------
 .../http/processor/SimpleHTTPGetProcessor.java  | 50 +++++++++++++++-----
 .../peoplepattern/AccountTypeProcessor.java     |  7 ++-
 .../peoplepattern/DemographicsProcessor.java    |  7 ++-
 .../streams-provider-twitter/pom.xml            |  2 +-
 .../processor/TwitterUrlApiProcessor.java       | 21 ++++----
 .../provider/TwitterTimelineProvider.java       |  8 ++--
 .../apache/streams/data/util/ExtensionUtil.java | 37 ++++++---------
 7 files changed, 77 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 0d17cc6..d3d4429 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -21,6 +21,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,15 +100,30 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
         }
 
     }
-        /**
-         Override this to place result in non-standard location on document
-         */
-    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+
+        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+            return mapper.convertValue(rootDocument, ActivityObject.class);
+        else
+            return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class);
+
+    }
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) {
 
         if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return rootDocument;
+            return mapper.convertValue(activityObject, ObjectNode.class);
         else
-            return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+            rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class));
+
+        return rootDocument;
 
     }
 
@@ -150,9 +166,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
             try {
                 response.close();
             } catch (IOException e) {}
-            try {
-                httpclient.close();
-            } catch (IOException e) {}
         }
 
         if( entityString == null )
@@ -162,12 +175,12 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
 
         ObjectNode extensionFragment = prepareExtensionFragment(entityString);
 
-        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
-        ExtensionUtil.ensureExtensions(extensionEntity);
+        ActivityObject extensionEntity = getEntityToExtend(rootDocument);
 
         ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
 
+        rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+
         entry.setDocument(rootDocument);
 
         result.add(entry);
@@ -220,5 +233,18 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
     @Override
     public void cleanUp() {
         LOGGER.info("shutting down SimpleHTTPGetProcessor");
+        try {
+            httpclient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                httpclient = null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index d180b7f..edcf4d3 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
     @Override
     protected Map<String, String> prepareParams(StreamsDatum entry) {
         Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
         Actor actor = activity.getActor();
-        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
-        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+        String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
         Map<String, String> params = Maps.newHashMap();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 6ffbb9b..60db379 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
@@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.data.util.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,10 +62,9 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor {
     @Override
     protected Map<String, String> prepareParams(StreamsDatum entry) {
         Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
         Actor actor = activity.getActor();
-        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
-        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class);
+        String username = (String) ExtensionUtil.getExtension(actorObject, "screenName");
         Map<String, String> params = Maps.newHashMap();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index f0d65f8..604e5a7 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -55,7 +55,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-processor-http</artifactId>
+            <artifactId>streams-http</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 438937f..54e1369 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,11 +1,10 @@
 package org.apache.streams.twitter.processor;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.components.http.SimpleHTTPGetProcessor;
-import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
@@ -21,23 +20,27 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
     public TwitterUrlApiProcessor() {
         super();
         this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setResourcePath("/1/urls/count.json");
+        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
         this.configuration.setExtension("twitter_url_count");
     }
 
     public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
         super(processorConfiguration);
         this.configuration.setHostname("urls.api.twitter.com");
-        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setResourcePath("/1/urls/count.json");
+        this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY);
         this.configuration.setExtension("twitter_url_count");
     }
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
         Preconditions.checkArgument(entry.getDocument() instanceof Activity);
-        Activity activity = mapper.convertValue(entry, Activity.class);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
-        return super.process(entry);
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        if( activity.getLinks() != null && activity.getLinks().size() > 0)
+            return super.process(entry);
+        else
+            return Lists.newArrayList(entry);
     }
 
     @Override
@@ -45,7 +48,7 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St
 
         Map<String, String> params = Maps.newHashMap();
 
-        params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+        params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0));
 
         return params;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index ae755c2..86395a2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -19,9 +19,9 @@
 package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -31,14 +31,16 @@ import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index a8d068a..7ce013c 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,7 +1,6 @@
 package org.apache.streams.data.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.ActivityObject;
@@ -41,38 +40,33 @@ public class ExtensionUtil {
 
     private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public static Map<String, Object> getExtensions(ObjectNode object) {
+    public static Map<String, Object> getExtensions(ActivityObject object) {
         ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        Map<String,Object> extensions = ensureExtensions(object);
         return extensions;
     }
 
-    public static Object getExtension(ObjectNode object, String key) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static Object getExtension(ActivityObject object, String key) {
+        Map<String,Object> extensions = ensureExtensions(object);
         return extensions.get(key);
     }
 
-    public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+    public static void setExtensions(ActivityObject object, Map<String, Object> extensions) {
+        object.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
     };
 
-    public static void addExtension(ObjectNode object, String key, Object extension) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static void addExtension(ActivityObject object, String key, Object extension) {
+        Map<String,Object> extensions = ensureExtensions(object);
         extensions.put(key, extension);
     };
 
-    public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+    public static void addExtensions(ActivityObject object, Map<String, Object> extensions) {
         for( Map.Entry<String, Object> item : extensions.entrySet())
-            activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+            addExtension(object, item.getKey(), item.getValue());
     };
 
-    public static void removeExtension(ObjectNode object, String key) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static void removeExtension(ActivityObject object, String key) {
+        Map<String,Object> extensions = ensureExtensions(object);
         extensions.remove(key);
     };
 
@@ -82,13 +76,12 @@ public class ExtensionUtil {
      * @return the Map representing the extensions property
      */
     @SuppressWarnings("unchecked")
-    public static Map<String, Object> ensureExtensions(ObjectNode object) {
-        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
-        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+    public static Map<String, Object> ensureExtensions(ActivityObject object) {
+        Map<String,Object> extensions = (Map<String,Object>) object.getAdditionalProperties().get(EXTENSION_PROPERTY);
         if(extensions == null) {
             extensions = Maps.newHashMap();
             setExtensions(object, extensions);
         }
-        return getExtensions(object);
+        return extensions;
     }
 }


[10/12] incubator-streams git commit: simple Integration Test

Posted by sb...@apache.org.
simple Integration Test


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbd7d890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbd7d890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbd7d890

Branch: refs/heads/master
Commit: dbd7d890804391025cf085f8a1c141a643d0b94b
Parents: b8e7a69
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 09:45:32 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:47:21 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         | 56 ++++++++++++++++++++
 streams-contrib/streams-persist-console/pom.xml | 34 ++++++++++++
 .../streams/console/ConsolePersistReader.java   | 11 ++--
 .../streams/console/ConsolePersistWriter.java   | 11 ++--
 4 files changed, 103 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f8b6a14..e86f02d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,8 @@
         <jaxb2-basics.version>0.8.4</jaxb2-basics.version>
         <jaxbutil.version>1.2.6</jaxbutil.version>
         <junit.version>4.11</junit.version>
+        <surefire.plugin.version>2.17</surefire.plugin.version>
+        <failsafe.plugin.version>2.17</failsafe.plugin.version>
         <slf4j.version>1.7.6</slf4j.version>
         <logback.version>1.1.1</logback.version>
         <commons-io.version>2.4</commons-io.version>
@@ -90,6 +92,7 @@
         <facebook4j.version>2.1.0</facebook4j.version>
         <maven.enforcer.plugin.version>1.3.1</maven.enforcer.plugin.version>
         <mockito.version>1.9.5</mockito.version>
+        <powermock.version>1.5.6</powermock.version>
     </properties>
 
     <modules>
@@ -177,6 +180,41 @@
                     <artifactId>build-helper-maven-plugin</artifactId>
                     <version>${build-helper.version}</version>
                 </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>${failsafe.plugin.version}</version>
+                    <executions>
+                        <execution>
+                            <id>integration-test</id>
+                            <goals>
+                                <goal>integration-test</goal>
+                                <goal>verify</goal>
+                            </goals>
+                            <configuration>
+                                <!-- Sets the VM argument line used when integration tests are run. -->
+                                <argLine>${failsafeArgLine}</argLine>
+                                <!-- Skips integration tests if the value of skip.integration.tests property is true -->
+                                <skipTests>${skip.integration.tests}</skipTests>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>${surefire.plugin.version}</version>
+                    <configuration>
+                        <!-- Sets the VM argument line used when unit tests are run. -->
+                        <argLine>${surefireArgLine}</argLine>
+                        <!-- Skips unit tests if the value of skip.unit.tests property is true -->
+                        <skipTests>${skip.unit.tests}</skipTests>
+                        <!-- Excludes integration tests when unit tests are run. -->
+                        <excludes>
+                            <exclude>**/IT*.java</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
             </plugins>
         </pluginManagement>
     </build>
@@ -231,6 +269,24 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-module-junit4</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.powermock</groupId>
+                <artifactId>powermock-api-mockito</artifactId>
+                <version>${powermock.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>${slf4j.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/pom.xml b/streams-contrib/streams-persist-console/pom.xml
index c7f2cd3..02ec403 100644
--- a/streams-contrib/streams-persist-console/pom.xml
+++ b/streams-contrib/streams-persist-console/pom.xml
@@ -26,5 +26,39 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+        </dependency>
     </dependencies>
+    <build>
+    <sourceDirectory>src/main/java</sourceDirectory>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
+    <resources>
+        <resource>
+            <directory>src/main/resources</directory>
+        </resource>
+    </resources>
+    <testResources>
+        <testResource>
+            <directory>src/test/resources</directory>
+        </testResource>
+    </testResources>
+    </build>
+
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 776d5a3..8afba85 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.InputStream;
+import java.io.PrintStream;
 import java.math.BigInteger;
 import java.util.Queue;
 import java.util.Scanner;
@@ -44,16 +45,16 @@ public class ConsolePersistReader implements StreamsPersistReader {
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    protected InputStream inputStream = System.in;
 
     public ConsolePersistReader() {
         this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
     }
 
-    public ConsolePersistReader(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
+    public ConsolePersistReader(InputStream inputStream) {
+        this();
+        this.inputStream = inputStream;
     }
-
     public void prepare(Object o) {
 
     }
@@ -77,7 +78,7 @@ public class ConsolePersistReader implements StreamsPersistReader {
 
         LOGGER.info("{} readCurrent", STREAMS_ID);
 
-        Scanner sc = new Scanner(System.in);
+        Scanner sc = new Scanner(inputStream);
 
         while( sc.hasNextLine() ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbd7d890/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 96d116f..53bb8d7 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -27,6 +27,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.PrintStream;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -34,6 +35,8 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
+    protected PrintStream printStream = System.out;
+
     protected volatile Queue<StreamsDatum> persistQueue;
 
     private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -42,8 +45,9 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
         this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
     }
 
-    public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
+    public ConsolePersistWriter(PrintStream printStream) {
+        this();
+        this.printStream = printStream;
     }
 
     public void prepare(Object o) {
@@ -61,8 +65,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter {
 
             String text = mapper.writeValueAsString(entry);
 
-            System.out.println("\n"+text+"\n");
-//            LOGGER.info(text);
+            printStream.println(text);
 
         } catch (JsonProcessingException e) {
             LOGGER.warn("save: {}", e);


[07/12] incubator-streams git commit: removing validation, as jackson does not contain a default implementation until 2.5 release

Posted by sb...@apache.org.
removing validation, as jackson does not contain a default implementation until 2.5 release


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ad5f90cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ad5f90cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ad5f90cc

Branch: refs/heads/master
Commit: ad5f90cc115c40e0ef480a85957bbb4c98157cf6
Parents: 9a57532
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 20:21:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 20:21:22 2014 -0500

----------------------------------------------------------------------
 streams-components/streams-http/pom.xml                        | 6 ------
 .../components/http/processor/SimpleHTTPGetProcessor.java      | 6 ------
 .../components/http/provider/SimpleHTTPGetProvider.java        | 3 ---
 3 files changed, 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
index 9c2b079..39a4faa 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -38,12 +38,6 @@
             <artifactId>jsonschema2pojo-core</artifactId>
             <type>jar</type>
             <scope>compile</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.validation</groupId>
-                    <artifactId>validation-api</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index c2bfef6..0d17cc6 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -3,7 +3,6 @@ package org.apache.streams.components.http.processor;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -25,8 +24,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.Validation;
-import javax.validation.ValidatorFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -199,9 +196,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor {
     @Override
     public void prepare(Object configurationObject) {
 
-        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
         mapper = StreamsJacksonMapper.getInstance();
 
         uriBuilder = new URIBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ad5f90cc/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
index 622225a..36084c7 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -89,9 +89,6 @@ public class SimpleHTTPGetProvider implements StreamsProvider {
     @Override
     public void prepare(Object configurationObject) {
 
-//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-//        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
         mapper = StreamsJacksonMapper.getInstance();
 
         uriBuilder = new URIBuilder()


[11/12] incubator-streams git commit: adds arbitrary Joda format support to StreamsJacksonMapper

Posted by sb...@apache.org.
adds arbitrary Joda format support to StreamsJacksonMapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e83659c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e83659c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e83659c5

Branch: refs/heads/master
Commit: e83659c51138b285dfa738739a765c7a222890c5
Parents: dbd7d89
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 13:58:11 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:47:21 2014 -0800

----------------------------------------------------------------------
 .../jackson/StreamsDateTimeDeserializer.java    | 23 +++++++++++++++++++-
 .../streams/jackson/StreamsJacksonMapper.java   | 20 +++++++++++++++++
 .../streams/jackson/StreamsJacksonModule.java   |  9 ++++++++
 3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index e0b98b2..8f53954 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -21,23 +21,44 @@ package org.apache.streams.jackson;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.google.common.collect.Lists;
 import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * Created by sblackmon on 3/27/14.
  */
 public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable {
 
+    List<DateTimeFormatter> formatters = Lists.newArrayList();
+
     protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass) {
         super(dateTimeClass);
     }
 
+    protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass, List<String> formats) {
+        super(dateTimeClass);
+        for( String format : formats )
+            formatters.add(DateTimeFormat.forPattern(format));
+    }
+
     @Override
     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
-        return RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+
+        DateTime result = RFC3339Utils.parseToUTC(jpar.getValueAsString());
+        Iterator<DateTimeFormatter> iterator = formatters.iterator();
+        while( result == null && iterator.hasNext()) {
+            DateTimeFormatter formatter = iterator.next();
+            result = formatter.parseDateTime(jpar.getValueAsString());
+        }
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index 25c0c89..8a74caa 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 
+import java.util.List;
+
 /**
  * Created by sblackmon on 3/27/14.
  */
@@ -36,9 +38,27 @@ public class StreamsJacksonMapper extends ObjectMapper {
         return INSTANCE;
     }
 
+    public static StreamsJacksonMapper getInstance(List<String> formats){
+
+        StreamsJacksonMapper instance = new StreamsJacksonMapper(formats);
+
+        return instance;
+
+    }
+
     public StreamsJacksonMapper() {
         super();
         registerModule(new StreamsJacksonModule());
+        configure();
+    }
+
+    public StreamsJacksonMapper(List<String> formats) {
+        super();
+        registerModule(new StreamsJacksonModule(formats));
+        configure();
+    }
+
+    public void configure() {
         disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
         configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
         configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e83659c5/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 2869414..8b44b0f 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
+import java.util.List;
+
 /**
  * Created by sblackmon on 3/27/14.
  */
@@ -36,5 +38,12 @@ public class StreamsJacksonModule extends SimpleModule {
         addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
     }
 
+    public StreamsJacksonModule(List<String> formats) {
+        super();
+        addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
+        addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class, formats));
 
+        addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
+        addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
+    }
 }


[09/12] incubator-streams git commit: added license header and javadoc comments

Posted by sb...@apache.org.
added license header and javadoc comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d2ffe62b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d2ffe62b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d2ffe62b

Branch: refs/heads/master
Commit: d2ffe62baf88618c7c8a6b9a1e5a79d21ed9e3fd
Parents: 0c8e67c
Author: sblackmon <sb...@apache.org>
Authored: Mon Oct 13 17:07:19 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Oct 13 17:07:19 2014 -0500

----------------------------------------------------------------------
 .../components/http/HttpConfigurator.java       |  2 +-
 .../http/processor/SimpleHTTPGetProcessor.java  | 18 +++++++++++++++++
 .../http/provider/SimpleHTTPGetProvider.java    | 18 +++++++++++++++++
 .../peoplepattern/AccountTypeProcessor.java     |  2 +-
 .../processor/TwitterUrlApiProcessor.java       | 20 ++++++++++++++++++-
 .../apache/streams/data/util/ExtensionUtil.java | 21 ++++++++++++++++++++
 6 files changed, 78 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
index 900831f..979a680 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ * Converts a {@link com.typesafe.config.Config} element into an instance of HttpConfiguration
  */
 public class HttpConfigurator {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index d3d4429..b8c957c 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.components.http.processor;
 
 import com.fasterxml.jackson.core.JsonProcessingException;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
index 36084c7..118d06b 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.components.http.provider;
 
 import com.fasterxml.jackson.databind.JsonNode;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index edcf4d3..4e4a6af 100644
--- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 /**
- * Enrich actor with demographics
+ * Enrich actor with account type
  */
 public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 54e1369..17ce411 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.twitter.processor;
 
 import com.google.common.base.Preconditions;
@@ -13,7 +31,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 9/14/14.
+ * Class gets a global share count from Twitter API for links on Activity datums
  */
 public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d2ffe62b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
index 7ce013c..1e0e384 100644
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.data.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -7,6 +25,9 @@ import org.apache.streams.pojo.json.ActivityObject;
 
 import java.util.Map;
 
+/**
+ *  Class makes it easier to manage extensions added to activities, actors, objects, etc...
+ */
 public class ExtensionUtil {
 
     /**


[06/12] incubator-streams git commit: committing PeoplePattern processor

Posted by sb...@apache.org.
committing PeoplePattern processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a575322
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a575322
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a575322

Branch: refs/heads/master
Commit: 9a575322231e4a8a69c56f06da743ffe3211ccb4
Parents: d62061d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 20:20:52 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 20:20:52 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   5 +-
 .../streams-processor-peoplepattern/pom.xml     | 138 +++++++++++++++++++
 .../peoplepattern/AccountTypeProcessor.java     |  76 ++++++++++
 .../peoplepattern/DemographicsProcessor.java    |  77 +++++++++++
 .../streams/peoplepattern/AccountType.json      |  27 ++++
 .../streams/peoplepattern/Demographics.json     |  60 ++++++++
 .../resources/templates/peoplepatternactor.json |  25 ++++
 7 files changed, 406 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index e290466..fcec297 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,17 +44,18 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
-		<module>streams-amazon-aws</module>
+        <module>streams-amazon-aws</module>
         <!--<module>streams-processor-lucene</module>-->
         <!--<module>streams-processor-tika</module>-->
-        <module>streams-provider-instagram</module>
         <module>streams-processor-jackson</module>
         <module>streams-processor-json</module>
         <module>streams-processor-urls</module>
+        <module>streams-processor-peoplepattern</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>
         <module>streams-provider-google</module>
         <module>streams-provider-gnip</module>
+        <module>streams-provider-instagram</module>
         <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>
         <module>streams-provider-sysomos</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml
new file mode 100644
index 0000000..b810200
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-peoplepattern</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+                <plugin>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>build-helper-maven-plugin</artifactId>
+                    <executions>
+                        <execution>
+                            <id>add-source</id>
+                            <phase>generate-sources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                        <execution>
+                            <id>add-source-jaxb2</id>
+                            <phase>generate-sources</phase>
+                            <goals>
+                                <goal>add-source</goal>
+                            </goals>
+                            <configuration>
+                                <sources>
+                                    <source>target/generated-sources/jaxb2</source>
+                                </sources>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.jsonschema2pojo</groupId>
+                    <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                    <configuration>
+                        <addCompileSourceRoot>true</addCompileSourceRoot>
+                        <generateBuilders>true</generateBuilders>
+                        <sourcePaths>
+                            <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath>
+                        </sourcePaths>
+                        <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                        <targetPackage>org.apache.streams.peoplepattern</targetPackage>
+                        <useLongIntegers>true</useLongIntegers>
+                        <useJodaDates>true</useJodaDates>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>generate</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
new file mode 100644
index 0000000..d180b7f
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
+
+    private final static String STREAMS_ID = "AccountTypeProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class);
+
+    public AccountTypeProcessor() {
+        this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+    }
+
+    public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+        super(peoplePatternConfiguration);
+        LOGGER.info("creating AccountTypeProcessor");
+        configuration.setProtocol("https");
+        configuration.setHostname("api.peoplepattern.com");
+        configuration.setResourcePath("/v0.2/account_type/");
+        configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+        configuration.setExtension("account_type");
+    }
+
+    /**
+     Override this to add parameters to the request
+     */
+    @Override
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+        Actor actor = activity.getActor();
+        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        Map<String, String> params = Maps.newHashMap();
+        params.put("id", actor.getId());
+        params.put("name", actor.getDisplayName());
+        params.put("username", username);
+        params.put("description", actor.getSummary());
+        return params;
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
new file mode 100644
index 0000000..6ffbb9b
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class DemographicsProcessor extends SimpleHTTPGetProcessor {
+
+    public final static String STREAMS_ID = "DemographicsProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class);
+
+    public DemographicsProcessor() {
+        this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+    }
+
+    public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+        super(peoplePatternConfiguration);
+        LOGGER.info("creating DemographicsProcessor");
+        configuration.setProtocol("https");
+        configuration.setHostname("api.peoplepattern.com");
+        configuration.setResourcePath("/v0.2/demographics/");
+        configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+        configuration.setExtension("demographics");
+    }
+
+    /**
+     Override this to add parameters to the request
+     */
+    @Override
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+        Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+        Actor actor = activity.getActor();
+        ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+        String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+        Map<String, String> params = Maps.newHashMap();
+        params.put("id", actor.getId());
+        params.put("name", actor.getDisplayName());
+        params.put("username", username);
+        params.put("description", actor.getSummary());
+        return params;
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
new file mode 100644
index 0000000..5656b44
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
@@ -0,0 +1,27 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType": "org.apache.streams.peoplepattern.AccountType",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "prediction" : {
+            "type" : "string",
+            "enum" : [
+                "person",
+                "organization",
+                "entertainment",
+                "adult",
+                "spam",
+                "no-prediction"
+            ],
+            "default": "no-prediction"
+        },
+        "score": {
+            "type": "number"
+        },
+        "id": {
+            "type": "string"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
new file mode 100644
index 0000000..d1f64d8
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
@@ -0,0 +1,60 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType": "org.apache.streams.peoplepattern.Demographics",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "age": {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "integer",
+                    "default": 1990
+                },
+                "score": {
+                    "type": "number"
+                }
+
+            }
+        },
+        "gender" : {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "string",
+                    "enum": [
+                        "male",
+                        "female",
+                        "no-prediction"
+                    ],
+                    "default": "no-prediction"
+                },
+                "score": {
+                    "type": "number"
+                }
+            }
+        },
+        "race" : {
+            "type": "object",
+            "properties": {
+                "prediction": {
+                    "type": "string",
+                    "enum": [
+                        "black",
+                        "east-asian",
+                        "hispanic",
+                        "middle-eastern",
+                        "south-asian",
+                        "white",
+                        "no-prediction"
+                    ],
+                    "default": "no-prediction"
+                },
+                "score": {
+                    "type": "number"
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
new file mode 100644
index 0000000..9a24c5c
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
@@ -0,0 +1,25 @@
+{
+	"order": 20,
+	"template": "*activity*",
+	"settings": {},
+	"mappings": {
+        "activity": {
+            "properties": {
+                "actor": {
+                    "properties": {
+                        "extensions": {
+                            "properties": {
+                                "account_type": {
+                                    "type": "nested"
+                                },
+                                "demographics": {
+                                    "type": "nested"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file


[12/12] incubator-streams git commit: Merge branch 'STREAMS-168'

Posted by sb...@apache.org.
Merge branch 'STREAMS-168'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ab9c6963
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ab9c6963
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ab9c6963

Branch: refs/heads/master
Commit: ab9c6963d8d3524ab88703a73a6c0e05842a5fee
Parents: e83659c d2ffe62
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 7 10:48:50 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:48:50 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 streams-components/pom.xml                      |  62 +++++
 streams-components/streams-http/README.md       |  16 ++
 streams-components/streams-http/pom.xml         | 153 +++++++++++
 .../components/http/HttpConfigurator.java       |  62 +++++
 .../http/processor/SimpleHTTPGetProcessor.java  | 268 +++++++++++++++++++
 .../http/provider/SimpleHTTPGetProvider.java    | 230 ++++++++++++++++
 .../components/http/HttpConfiguration.json      |  50 ++++
 .../http/HttpProcessorConfiguration.json        |  28 ++
 .../http/HttpProviderConfiguration.json         |  18 ++
 streams-contrib/pom.xml                         |   5 +-
 .../streams-processor-peoplepattern/pom.xml     | 138 ++++++++++
 .../peoplepattern/AccountTypeProcessor.java     |  75 ++++++
 .../peoplepattern/DemographicsProcessor.java    |  76 ++++++
 .../streams/peoplepattern/AccountType.json      |  27 ++
 .../streams/peoplepattern/Demographics.json     |  60 +++++
 .../resources/templates/peoplepatternactor.json |  25 ++
 .../api/FacebookPostActivitySerializer.java     |   1 -
 .../streams-provider-twitter/pom.xml            |   5 +
 .../processor/TwitterUrlApiProcessor.java       |  73 +++++
 .../provider/TwitterTimelineProvider.java       |   7 +-
 streams-pojo-extensions/pom.xml                 |  64 +++++
 .../apache/streams/data/util/ExtensionUtil.java | 108 ++++++++
 .../apache/streams/data/util/ActivityUtil.java  |  14 +-
 .../org/apache/streams/pojo/json/activity.json  |   3 +-
 .../org/apache/streams/pojo/json/object.json    |   2 +-
 26 files changed, 1559 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ab9c6963/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------


[04/12] incubator-streams git commit: class need not be abstract rootDocument is what should exit processor

Posted by sb...@apache.org.
class need not be abstract
rootDocument is what should exit processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5764609
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5764609
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5764609

Branch: refs/heads/master
Commit: a576460911464cf260e37886ce840f4e57dd5f30
Parents: b8ccf9f
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 14:24:53 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 14:24:53 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/components/http/SimpleHTTPGetProcessor.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5764609/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
index dec9d03..d74793a 100644
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
+public class SimpleHTTPGetProcessor implements StreamsProcessor {
 
     private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
 
@@ -152,7 +152,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
         try {
             response = httpclient.execute(httpget);
             HttpEntity entity = response.getEntity();
-            // TODO: handle rate-limiting
+            // TODO: handle retry
             if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
                 entityString = EntityUtils.toString(entity);
             }
@@ -181,7 +181,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
 
         ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
 
-        entry.setDocument(extensionEntity);
+        entry.setDocument(rootDocument);
 
         result.add(entry);
 


[05/12] incubator-streams git commit: introduced a SimpleHTTPGetProvider reorganized configuration added access_token support added http basic auth support

Posted by sb...@apache.org.
introduced a SimpleHTTPGetProvider
reorganized configuration
added access_token support
added http basic auth support


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d62061de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d62061de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d62061de

Branch: refs/heads/master
Commit: d62061dec8628484e66d3494e02f1f65efafda41
Parents: a576460
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 17:49:01 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 17:49:01 2014 -0500

----------------------------------------------------------------------
 streams-components/pom.xml                      |   2 +-
 streams-components/streams-http/README.md       |  16 ++
 streams-components/streams-http/pom.xml         | 159 +++++++++++++
 .../components/http/HttpConfigurator.java       |  62 +++++
 .../http/processor/SimpleHTTPGetProcessor.java  | 230 +++++++++++++++++++
 .../http/provider/SimpleHTTPGetProvider.java    | 215 +++++++++++++++++
 .../components/http/HttpConfiguration.json      |  50 ++++
 .../http/HttpProcessorConfiguration.json        |  28 +++
 .../http/HttpProviderConfiguration.json         |  18 ++
 .../streams-processor-http/README.md            |  16 --
 .../streams-processor-http/pom.xml              | 154 -------------
 .../components/http/HttpConfigurator.java       |  53 -----
 .../components/http/SimpleHTTPGetProcessor.java | 218 ------------------
 .../HttpProcessorConfiguration.json             |  47 ----
 14 files changed, 779 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
index 26384b1..9942e14 100644
--- a/streams-components/pom.xml
+++ b/streams-components/pom.xml
@@ -37,7 +37,7 @@
     </properties>
 
     <modules>
-        <module>streams-processor-http</module>
+        <module>streams-http</module>
     </modules>
 
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/README.md b/streams-components/streams-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+    "http": {
+        "protocol": "http",
+        "hostname": "urls.api.twitter.com",
+        "port": 9300,
+        "resourceUri": "1/urls/count.json"
+    }
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
new file mode 100644
index 0000000..9c2b079
--- /dev/null
+++ b/streams-components/streams-http/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-components</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>streams-http</artifactId>
+
+    <name>streams-http</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <type>jar</type>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo-extensions</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.5</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>add-source-jaxb2</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jaxb2</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.http</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                    <includeJsr303Annotations>true</includeJsr303Annotations>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..900831f
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static HttpProviderConfiguration detectProviderConfiguration(Config config) {
+
+        HttpProviderConfiguration httpProviderConfiguration = null;
+
+        try {
+            httpProviderConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProviderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse http configuration", e.getMessage());
+        }
+        return httpProviderConfiguration;
+    }
+
+    public static HttpProcessorConfiguration detectProcessorConfiguration(Config config) {
+
+        HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+        try {
+            httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse http configuration", e.getMessage());
+        }
+        return httpProcessorConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..c2bfef6
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -0,0 +1,230 @@
+package org.apache.streams.components.http.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Processor retrieves contents from an known url and stores the resulting object in an extension field
+ */
+public class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+    // from root config id
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProcessorConfiguration configuration;
+
+    protected String authHeader;
+//
+//    // authorized only
+//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+//    //private String authHeader;
+//
+    public SimpleHTTPGetProcessor() {
+        this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
+    }
+
+    public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
+        LOGGER.info("creating SimpleHTTPGetProcessor");
+        LOGGER.info(processorConfiguration.toString());
+        this.configuration = processorConfiguration;
+    }
+
+
+    /**
+     Override this to store a result other than exact json representation of response
+     */
+    protected ObjectNode prepareExtensionFragment(String entityString) {
+
+        try {
+            return mapper.readValue(entityString, ObjectNode.class);
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+        try {
+            String json = datum.getDocument() instanceof String ?
+                    (String) datum.getDocument() :
+                    mapper.writeValueAsString(datum.getDocument());
+            return mapper.readValue(json, ObjectNode.class);
+        } catch (JsonProcessingException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+
+    }
+        /**
+         Override this to place result in non-standard location on document
+         */
+    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+            return rootDocument;
+        else
+            return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        ObjectNode rootDocument = getRootDocument(entry);
+
+        Map<String, String> params = prepareParams(entry);
+
+        URI uri;
+        for( Map.Entry<String,String> param : params.entrySet()) {
+            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+        }
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            LOGGER.error("URI error {}", uriBuilder.toString());
+            return result;
+        }
+
+        HttpGet httpget = prepareHttpGet(uri);
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle retry
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+            return result;
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+            try {
+                httpclient.close();
+            } catch (IOException e) {}
+        }
+
+        if( entityString == null )
+            return result;
+
+        LOGGER.debug(entityString);
+
+        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+        ExtensionUtil.ensureExtensions(extensionEntity);
+
+        ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+
+        entry.setDocument(rootDocument);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    /**
+     Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+
+    public HttpGet prepareHttpGet(URI uri) {
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        if( !Strings.isNullOrEmpty(authHeader))
+            httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+        return httpget;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPath(this.configuration.getResourcePath());
+
+        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
+            uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+        if( !Strings.isNullOrEmpty(configuration.getUsername())
+            && !Strings.isNullOrEmpty(configuration.getPassword())) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(configuration.getUsername());
+            stringBuilder.append(":");
+            stringBuilder.append(configuration.getPassword());
+            String string = stringBuilder.toString();
+            authHeader = Base64.encodeBase64String(string.getBytes());
+        }
+        httpclient = HttpClients.createDefault();
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("shutting down SimpleHTTPGetProcessor");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
new file mode 100644
index 0000000..622225a
--- /dev/null
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
@@ -0,0 +1,215 @@
+package org.apache.streams.components.http.provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Provider retrieves contents from an known set of urls and passes all resulting objects downstream
+ */
+public class SimpleHTTPGetProvider implements StreamsProvider {
+
+    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+    // from root config id
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProvider.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProviderConfiguration configuration;
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    //    // authorized only
+//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+//    //private String authHeader;
+//
+    public SimpleHTTPGetProvider() {
+        this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
+    }
+
+    public SimpleHTTPGetProvider(HttpProviderConfiguration providerConfiguration) {
+        LOGGER.info("creating SimpleHTTPGetProvider");
+        LOGGER.info(providerConfiguration.toString());
+        this.configuration = providerConfiguration;
+    }
+
+    /**
+      Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+    public HttpGet prepareHttpGet(URI uri) {
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        return httpget;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+//        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPath(this.configuration.getResourcePath());
+
+        httpclient = HttpClients.createDefault();
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("shutting down SimpleHTTPGetProvider");
+        try {
+            httpclient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                httpclient.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            } finally {
+                httpclient = null;
+            }
+        }
+    }
+
+    @Override
+    public void startStream() {
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        StreamsResultSet current;
+
+        uriBuilder = uriBuilder.setPath(
+            Joiner.on("/").skipNulls().join(uriBuilder.getPath(), configuration.getResource(), configuration.getResourcePostfix())
+        );
+
+        URI uri;
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            uri = null;
+        }
+
+        List<ObjectNode> results = executeGet(uri);
+
+        lock.writeLock().lock();
+
+        for( ObjectNode item : results ) {
+            providerQueue.add(new StreamsDatum(item, item.get("id").asText(), new DateTime(item.get("timestamp").asText())));
+        }
+
+        LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+        current = new StreamsResultSet(providerQueue);
+
+        return current;
+    }
+
+    protected List<ObjectNode> executeGet(URI uri) {
+
+        Preconditions.checkNotNull(uri);
+
+        List<ObjectNode> results = new ArrayList<>();
+
+        HttpGet httpget = prepareHttpGet(uri);
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle retry
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+                if( !entityString.equals("{}") && !entityString.equals("[]") ) {
+                    JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class);
+                    if (jsonNode != null && jsonNode instanceof ObjectNode ) {
+
+                        results.add((ObjectNode) jsonNode);
+                    } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
+                        ArrayNode arrayNode = (ArrayNode) jsonNode;
+                        Iterator<JsonNode> iterator = arrayNode.elements();
+                        while (iterator.hasNext()) {
+                            ObjectNode element = (ObjectNode) iterator.next();
+
+                            results.add(element);
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+        }
+        return results;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
new file mode 100644
index 0000000..b4dc243
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json
@@ -0,0 +1,50 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.components.http.HttpConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "Protocol",
+            "default": "http"
+        },
+        "hostname": {
+            "type": "string",
+            "description": "Hostname",
+            "required" : true
+        },
+        "port": {
+            "type": "integer",
+            "description": "Port",
+            "default": 80
+        },
+        "resourcePath": {
+            "type": "string",
+            "description": "Resource Path",
+            "required" : true
+        },
+        "content-type": {
+            "type": "string",
+            "description": "Resource content-type",
+            "required" : true,
+            "default": "application/json"
+        },
+        "access_token": {
+            "type": "string",
+            "description": "Known Access Token",
+            "required" : false
+        },
+        "username": {
+            "type": "string",
+            "description": "Basic Auth Username",
+            "required" : false
+        },
+        "password": {
+            "type": "string",
+            "description": "Basic Auth Password",
+            "required" : false
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..32e4c23
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProcessorConfiguration.json
@@ -0,0 +1,28 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": { "$ref": "HttpConfiguration.json" },
+    "properties": {
+        "entity": {
+            "type": "string",
+            "description": "Entity to extend",
+            "enum": [ "activity", "actor", "object", "target" ],
+            "required" : true,
+            "default": "activity"
+        },
+        "extension": {
+            "type": "string",
+            "description": "Extension identifier",
+            "required" : true
+        },
+        "urlField": {
+            "type": "string",
+            "description": "Field where url is located",
+            "required" : true,
+            "default": "url"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
new file mode 100644
index 0000000..2c135d9
--- /dev/null
+++ b/streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpProviderConfiguration.json
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.components.http.HttpProviderConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": { "$ref": "HttpConfiguration.json" },
+    "properties": {
+        "resource": {
+            "type": "string",
+            "required" : false
+        },
+        "resourcePostfix": {
+            "type": "string",
+            "required" : false
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md
deleted file mode 100644
index 62dd4c1..0000000
--- a/streams-components/streams-processor-http/README.md
+++ /dev/null
@@ -1,16 +0,0 @@
-streams-processor-http
-=====================
-
-Hit an http endpoint and place the result in extensions
-
-Example SimpleHTTPGetProcessor configuration:
-
-    "http": {
-        "protocol": "http",
-        "hostname": "urls.api.twitter.com",
-        "port": 9300,
-        "resourceUri": "1/urls/count.json"
-    }
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
deleted file mode 100644
index d9215ad..0000000
--- a/streams-components/streams-processor-http/pom.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streams</groupId>
-        <artifactId>streams-components</artifactId>
-        <version>0.1-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>streams-processor-http</artifactId>
-
-    <name>streams-processor-http</name>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.jsonschema2pojo</groupId>
-            <artifactId>jsonschema2pojo-core</artifactId>
-            <type>jar</type>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo-extensions</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>4.3.5</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-        <plugins>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.8</version>
-                <executions>
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>target/generated-sources/jsonschema2pojo</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-source-jaxb2</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>target/generated-sources/jaxb2</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.jsonschema2pojo</groupId>
-                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
-                <configuration>
-                    <addCompileSourceRoot>true</addCompileSourceRoot>
-                    <generateBuilders>true</generateBuilders>
-                    <sourcePaths>
-                        <sourcePath>src/main/jsonschema</sourcePath>
-                    </sourcePaths>
-                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
-                    <targetPackage>org.apache.streams.http</targetPackage>
-                    <useLongIntegers>true</useLongIntegers>
-                    <useJodaDates>true</useJodaDates>
-                    <includeJsr303Annotations>true</includeJsr303Annotations>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>generate</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
deleted file mode 100644
index 36801b8..0000000
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
- */
-public class HttpConfigurator {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static HttpProcessorConfiguration detectConfiguration(Config config) {
-
-        HttpProcessorConfiguration httpProcessorConfiguration = null;
-
-        try {
-            httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse http configuration", e.getMessage());
-        }
-        return httpProcessorConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
deleted file mode 100644
index d74793a..0000000
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ /dev/null
@@ -1,218 +0,0 @@
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.data.util.ExtensionUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.validation.Validation;
-import javax.validation.ValidatorFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class SimpleHTTPGetProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
-
-    // from root config id
-    private final static String EXTENSION = "account_type";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
-
-    protected ObjectMapper mapper;
-
-    protected URIBuilder uriBuilder;
-
-    protected CloseableHttpClient httpclient;
-
-    protected HttpProcessorConfiguration configuration;
-//
-//    // authorized only
-//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
-//    //private String authHeader;
-//
-    public SimpleHTTPGetProcessor() {
-        LOGGER.info("creating SimpleHTTPGetProcessor");
-        this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
-    }
-
-    public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
-        LOGGER.info("creating SimpleHTTPGetProcessor");
-        LOGGER.info(processorConfiguration.toString());
-        this.configuration = processorConfiguration;
-    }
-
-    /**
-      Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
-    }
-
-    /**
-     Override this to store a result other than exact json representation of response
-     */
-    protected ObjectNode prepareExtensionFragment(String entityString) {
-
-        try {
-            return mapper.readValue(entityString, ObjectNode.class);
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
-    }
-
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode getRootDocument(StreamsDatum datum) {
-
-        try {
-            String json = datum.getDocument() instanceof String ?
-                    (String) datum.getDocument() :
-                    mapper.writeValueAsString(datum.getDocument());
-            return mapper.readValue(json, ObjectNode.class);
-        } catch (JsonProcessingException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
-
-    }
-        /**
-         Override this to place result in non-standard location on document
-         */
-    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
-
-        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return rootDocument;
-        else
-            return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
-
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        ObjectNode rootDocument = getRootDocument(entry);
-
-        Map<String, String> params = prepareParams(entry);
-
-        URI uri;
-        for( Map.Entry<String,String> param : params.entrySet()) {
-            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
-        }
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            LOGGER.error("URI error {}", uriBuilder.toString());
-            return result;
-        }
-
-        HttpGet httpget = prepareHttpGet(uri);
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpget);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
-                entityString = EntityUtils.toString(entity);
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
-            return result;
-        } finally {
-            try {
-                response.close();
-            } catch (IOException e) {}
-            try {
-                httpclient.close();
-            } catch (IOException e) {}
-        }
-
-        if( entityString == null )
-            return result;
-
-        LOGGER.debug(entityString);
-
-        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
-
-        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
-
-        ExtensionUtil.ensureExtensions(extensionEntity);
-
-        ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
-
-        entry.setDocument(rootDocument);
-
-        result.add(entry);
-
-        return result;
-
-    }
-
-    public HttpGet prepareHttpGet(URI uri) {
-        HttpGet httpget = new HttpGet(uri);
-        httpget.addHeader("content-type", this.configuration.getContentType());
-        return httpget;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
-
-        mapper = StreamsJacksonMapper.getInstance();
-
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPath(this.configuration.getResourceUri());
-
-        httpclient = HttpClients.createDefault();
-    }
-
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down SimpleHTTPGetProcessor");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d62061de/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
deleted file mode 100644
index 40c3bcd..0000000
--- a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
+++ /dev/null
@@ -1,47 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "id": "#",
-    "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "Protocol",
-            "default": "http"
-        },
-        "hostname": {
-            "type": "string",
-            "description": "Hostname",
-            "required" : true
-        },
-        "port": {
-            "type": "integer",
-            "description": "Port",
-            "default": 80
-        },
-        "resourceUri": {
-            "type": "string",
-            "description": "Resource URI",
-            "required" : true
-        },
-        "content-type": {
-            "type": "string",
-            "description": "Resource URI",
-            "required" : true,
-            "default": "application/json"
-        },
-        "entity": {
-            "type": "string",
-            "description": "Entity to extend",
-            "enum": [ "activity", "actor", "object", "target" ],
-            "required" : true,
-            "default": "activity"
-        },
-        "extension": {
-            "type": "string",
-            "description": "Extension identifier",
-            "required" : true
-        }
-    }
-}
\ No newline at end of file


[03/12] incubator-streams git commit: added HttpConfigurator

Posted by sb...@apache.org.
added HttpConfigurator


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8ccf9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8ccf9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8ccf9f6

Branch: refs/heads/master
Commit: b8ccf9f6869c64f1f6e25f668942ea0de9fec2eb
Parents: 34232ad
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 18:38:09 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 18:38:09 2014 -0500

----------------------------------------------------------------------
 .../components/http/HttpConfigurator.java       | 53 ++++++++++++++++++++
 .../components/http/SimpleHTTPGetProcessor.java | 23 +++++----
 .../processor/TwitterUrlApiProcessor.java       |  8 +++
 3 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..36801b8
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
+
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+    public static HttpProcessorConfiguration detectConfiguration(Config config) {
+
+        HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+        try {
+            httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse http configuration", e.getMessage());
+        }
+        return httpProcessorConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
index d76d839..dec9d03 100644
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -19,6 +19,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.data.util.ActivityUtil;
@@ -34,6 +35,7 @@ import javax.validation.ValidatorFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -59,6 +61,11 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
 //    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
 //    //private String authHeader;
 //
+    public SimpleHTTPGetProcessor() {
+        LOGGER.info("creating SimpleHTTPGetProcessor");
+        this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
+    }
+
     public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
         LOGGER.info("creating SimpleHTTPGetProcessor");
         LOGGER.info(processorConfiguration.toString());
@@ -137,9 +144,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
             return result;
         }
 
-        HttpGet httpget = new HttpGet(uri);
-        httpget.addHeader("content-type", this.configuration.getContentType());
-        //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+        HttpGet httpget = prepareHttpGet(uri);
 
         CloseableHttpResponse response = null;
 
@@ -184,6 +189,12 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
 
     }
 
+    public HttpGet prepareHttpGet(URI uri) {
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        return httpget;
+    }
+
     @Override
     public void prepare(Object configurationObject) {
 
@@ -198,12 +209,6 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
             .setPath(this.configuration.getResourceUri());
 
         httpclient = HttpClients.createDefault();
-        //  StringBuilder stringBuilder = new StringBuilder();
-//        stringBuilder.append(peoplePatternConfiguration.getUsername());
-//        stringBuilder.append(":");
-//        stringBuilder.append(peoplePatternConfiguration.getPassword());
-//        String string = stringBuilder.toString();
-//        authHeader = Base64.encodeBase64String(string.getBytes());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 77965c4..438937f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -5,6 +5,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
 import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
@@ -17,6 +18,13 @@ import java.util.Map;
  */
 public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
 
+    public TwitterUrlApiProcessor() {
+        super();
+        this.configuration.setHostname("urls.api.twitter.com");
+        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setExtension("twitter_url_count");
+    }
+
     public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
         super(processorConfiguration);
         this.configuration.setHostname("urls.api.twitter.com");


[02/12] incubator-streams git commit: added streams-components module added streams-processor-http module activated streams-pojo-extensions module tweaks to pojo, require id and remove non-sensical defaults

Posted by sb...@apache.org.
added streams-components module
added streams-processor-http module
activated streams-pojo-extensions module
tweaks to pojo, require id and remove non-sensical defaults


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34232ad8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34232ad8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34232ad8

Branch: refs/heads/master
Commit: 34232ad87b50253913fba0dabb6fa9af591f388d
Parents: 1464819
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 13:50:32 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 13:50:32 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +
 .../streams-processor-http/README.md            |  16 ++
 .../streams-processor-http/pom.xml              |  42 +++-
 .../components/http/SimpleHTTPGetProcessor.java | 213 +++++++++++++++++++
 .../HttpProcessorConfiguration.json             |  47 ++++
 .../api/FacebookPostActivitySerializer.java     |   1 -
 .../streams-provider-twitter/pom.xml            |   5 +
 .../processor/TwitterUrlApiProcessor.java       |  44 ++++
 streams-pojo-extensions/pom.xml                 |  64 ++++++
 .../apache/streams/data/util/ExtensionUtil.java |  94 ++++++++
 .../apache/streams/data/util/ActivityUtil.java  |  14 +-
 .../org/apache/streams/pojo/json/activity.json  |   3 +-
 .../org/apache/streams/pojo/json/object.json    |   2 +-
 13 files changed, 537 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c2e1766..59fa634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,9 @@
         <module>streams-osgi-components</module>
         <module>streams-core</module>
         <module>streams-config</module>
+        <module>streams-components</module>
         <module>streams-pojo</module>
+        <module>streams-pojo-extensions</module>
         <module>streams-util</module>
         <module>streams-contrib</module>
         <module>streams-runtimes</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-processor-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+    "http": {
+        "protocol": "http",
+        "hostname": "urls.api.twitter.com",
+        "port": 9300,
+        "resourceUri": "1/urls/count.json"
+    }
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
index 67538aa..d9215ad 100644
--- a/streams-components/streams-processor-http/pom.xml
+++ b/streams-components/streams-processor-http/pom.xml
@@ -22,7 +22,7 @@
 
     <parent>
         <groupId>org.apache.streams</groupId>
-        <artifactId>streams-project</artifactId>
+        <artifactId>streams-components</artifactId>
         <version>0.1-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
@@ -40,6 +40,45 @@
             <scope>compile</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo-extensions</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.5</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -100,6 +139,7 @@
                     <targetPackage>org.apache.streams.http</targetPackage>
                     <useLongIntegers>true</useLongIntegers>
                     <useJodaDates>true</useJodaDates>
+                    <includeJsr303Annotations>true</includeJsr303Annotations>
                 </configuration>
                 <executions>
                     <execution>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..d76d839
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -0,0 +1,213 @@
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+    // from root config id
+    private final static String EXTENSION = "account_type";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+    protected ObjectMapper mapper;
+
+    protected URIBuilder uriBuilder;
+
+    protected CloseableHttpClient httpclient;
+
+    protected HttpProcessorConfiguration configuration;
+//
+//    // authorized only
+//    //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+//    //private String authHeader;
+//
+    public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
+        LOGGER.info("creating SimpleHTTPGetProcessor");
+        LOGGER.info(processorConfiguration.toString());
+        this.configuration = processorConfiguration;
+    }
+
+    /**
+      Override this to add parameters to the request
+     */
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        return Maps.newHashMap();
+    }
+
+    /**
+     Override this to store a result other than exact json representation of response
+     */
+    protected ObjectNode prepareExtensionFragment(String entityString) {
+
+        try {
+            return mapper.readValue(entityString, ObjectNode.class);
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+    }
+
+    /**
+     Override this to place result in non-standard location on document
+     */
+    protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+        try {
+            String json = datum.getDocument() instanceof String ?
+                    (String) datum.getDocument() :
+                    mapper.writeValueAsString(datum.getDocument());
+            return mapper.readValue(json, ObjectNode.class);
+        } catch (JsonProcessingException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        } catch (IOException e) {
+            LOGGER.warn(e.getMessage());
+            return null;
+        }
+
+    }
+        /**
+         Override this to place result in non-standard location on document
+         */
+    protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+        if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+            return rootDocument;
+        else
+            return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        ObjectNode rootDocument = getRootDocument(entry);
+
+        Map<String, String> params = prepareParams(entry);
+
+        URI uri;
+        for( Map.Entry<String,String> param : params.entrySet()) {
+            uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+        }
+        try {
+            uri = uriBuilder.build();
+        } catch (URISyntaxException e) {
+            LOGGER.error("URI error {}", uriBuilder.toString());
+            return result;
+        }
+
+        HttpGet httpget = new HttpGet(uri);
+        httpget.addHeader("content-type", this.configuration.getContentType());
+        //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpget);
+            HttpEntity entity = response.getEntity();
+            // TODO: handle rate-limiting
+            if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+            return result;
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+            try {
+                httpclient.close();
+            } catch (IOException e) {}
+        }
+
+        if( entityString == null )
+            return result;
+
+        LOGGER.debug(entityString);
+
+        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+        ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+        ExtensionUtil.ensureExtensions(extensionEntity);
+
+        ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+
+        entry.setDocument(extensionEntity);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        uriBuilder = new URIBuilder()
+            .setScheme(this.configuration.getProtocol())
+            .setHost(this.configuration.getHostname())
+            .setPath(this.configuration.getResourceUri());
+
+        httpclient = HttpClients.createDefault();
+        //  StringBuilder stringBuilder = new StringBuilder();
+//        stringBuilder.append(peoplePatternConfiguration.getUsername());
+//        stringBuilder.append(":");
+//        stringBuilder.append(peoplePatternConfiguration.getPassword());
+//        String string = stringBuilder.toString();
+//        authHeader = Base64.encodeBase64String(string.getBytes());
+    }
+
+    @Override
+    public void cleanUp() {
+        LOGGER.info("shutting down SimpleHTTPGetProcessor");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..40c3bcd
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
@@ -0,0 +1,47 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "Protocol",
+            "default": "http"
+        },
+        "hostname": {
+            "type": "string",
+            "description": "Hostname",
+            "required" : true
+        },
+        "port": {
+            "type": "integer",
+            "description": "Port",
+            "default": 80
+        },
+        "resourceUri": {
+            "type": "string",
+            "description": "Resource URI",
+            "required" : true
+        },
+        "content-type": {
+            "type": "string",
+            "description": "Resource URI",
+            "required" : true,
+            "default": "application/json"
+        },
+        "entity": {
+            "type": "string",
+            "description": "Entity to extend",
+            "enum": [ "activity", "actor", "object", "target" ],
+            "required" : true,
+            "default": "activity"
+        },
+        "extension": {
+            "type": "string",
+            "description": "Extension identifier",
+            "required" : true
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index aa718fb..de39262 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -78,7 +78,6 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
         setProvider(activity);
         setObjectType(post.getType(), activity);
         parseObject(activity, mapper.convertValue(post, ObjectNode.class));
-        fixObjectId(activity);
         fixContentFromSummary(activity);
         activity.setVerb("post");
         List<String> links = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 79d1608..f0d65f8 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -54,6 +54,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-processor-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
new file mode 100644
index 0000000..77965c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -0,0 +1,44 @@
+package org.apache.streams.twitter.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 9/14/14.
+ */
+public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
+
+    public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
+        super(processorConfiguration);
+        this.configuration.setHostname("urls.api.twitter.com");
+        this.configuration.setResourceUri("/1/urls/count.json");
+        this.configuration.setExtension("twitter_url_count");
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+        Activity activity = mapper.convertValue(entry, Activity.class);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
+        return super.process(entry);
+    }
+
+    @Override
+    protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+        Map<String, String> params = Maps.newHashMap();
+
+        params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+
+        return params;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/pom.xml b/streams-pojo-extensions/pom.xml
new file mode 100644
index 0000000..7f3f1a5
--- /dev/null
+++ b/streams-pojo-extensions/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-project</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>streams-pojo-extensions</artifactId>
+
+    <name>streams-pojo-extensions</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
new file mode 100644
index 0000000..a8d068a
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -0,0 +1,94 @@
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import java.util.Map;
+
+public class ExtensionUtil {
+
+    /**
+     * Property on the activity object to use for extensions
+     */
+    public static final String EXTENSION_PROPERTY = "extensions";
+    /**
+     * The number of +1, Like, favorites, etc that the post has received
+     */
+    public static final String LIKES_EXTENSION = "likes";
+    /**
+     * The number of retweets, shares, etc that the post has received
+     */
+    public static final String REBROADCAST_EXTENSION = "rebroadcasts";
+    /**
+     * The language of the post
+     */
+    public static final String LANGUAGE_EXTENSION = "language";
+    /**
+     * Location that the post was made or the actor's residence
+     */
+    public static final String LOCATION_EXTENSION = "location";
+    /**
+     * Country that the post was made
+     */
+    public static final String LOCATION_EXTENSION_COUNTRY = "country";
+    /**
+     * Specific JSON-geo coordinates (long,lat)
+     */
+    public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+
+    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public static Map<String, Object> getExtensions(ObjectNode object) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        return extensions;
+    }
+
+    public static Object getExtension(ObjectNode object, String key) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        return extensions.get(key);
+    }
+
+    public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+    };
+
+    public static void addExtension(ObjectNode object, String key, Object extension) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        extensions.put(key, extension);
+    };
+
+    public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        for( Map.Entry<String, Object> item : extensions.entrySet())
+            activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+    };
+
+    public static void removeExtension(ObjectNode object, String key) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        extensions.remove(key);
+    };
+
+    /**
+     * Creates a standard extension property
+     * @param object objectnode to create the property in
+     * @return the Map representing the extensions property
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> ensureExtensions(ObjectNode object) {
+        ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+        Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        if(extensions == null) {
+            extensions = Maps.newHashMap();
+            setExtensions(object, extensions);
+        }
+        return getExtensions(object);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 3684b32..04ee923 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -19,6 +19,7 @@
 package org.apache.streams.data.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
@@ -61,7 +62,7 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     /**
      * Creates a standard extension property
@@ -69,13 +70,14 @@ public class ActivityUtil {
      * @return the Map representing the extensions property
      */
     @SuppressWarnings("unchecked")
+    @Deprecated
     public static Map<String, Object> ensureExtensions(Activity activity) {
-        Map<String, Object> properties = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
-        if(properties == null) {
-            properties = new HashMap<String, Object>();
-            activity.setAdditionalProperty(EXTENSION_PROPERTY, properties);
+        Map<String, Object> extensions = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
+        if(extensions == null) {
+            extensions = new HashMap<String, Object>();
+            activity.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
         }
-        return properties;
+        return extensions;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
index 45c2276..a68ce00 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
@@ -8,7 +8,8 @@
   "properties": {
     "id" :{
       "type" : "string",
-      "description" : "Uniquely identifies each activity within the service"
+      "description" : "Uniquely identifies each activity within the service",
+      "required" : true
     },
     "actor" : {
       "type": "object",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
index d51db27..eec09a8 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
@@ -8,7 +8,7 @@
     "id" : {
       "type" : "string",
       "description" : "Provides a permanent, universally unique identifier for the object in the form of an absolute IRI [RFC3987]. An object SHOULD contain a single id property. If an object does not contain an id property, consumers MAY use the value of the url property as a less-reliable, non-unique identifier.",
-      "default" : "{link}"
+      "required" : true
     },
     "image" : {
       "format":"image",