You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/09/16 01:52:30 UTC
[1/3] git commit: just pons
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-168 [created] b8ccf9f68
just pons
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1464819f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1464819f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1464819f
Branch: refs/heads/STREAMS-168
Commit: 1464819fd7ea147830a4211ea2ac6af4aa715022
Parents: 35a8fbf
Author: sblackmon <sb...@apache.org>
Authored: Sun Sep 14 10:24:47 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Sun Sep 14 10:24:47 2014 -0500
----------------------------------------------------------------------
streams-components/pom.xml | 62 ++++++++++
.../streams-processor-http/pom.xml | 114 +++++++++++++++++++
2 files changed, 176 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
new file mode 100644
index 0000000..26384b1
--- /dev/null
+++ b/streams-components/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-project</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-components</artifactId>
+
+ <packaging>pom</packaging>
+ <name>streams-components</name>
+
+ <properties>
+
+ </properties>
+
+ <modules>
+ <module>streams-processor-http</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1464819f/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
new file mode 100644
index 0000000..67538aa
--- /dev/null
+++ b/streams-components/streams-processor-http/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-project</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>streams-processor-http</artifactId>
+
+ <name>streams-processor-http</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.http</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
[2/3] git commit: added streams-components module added
streams-processor-http module activated streams-pojo-extensions module tweaks
to pojo, require id and remove non-sensical defaults
Posted by sb...@apache.org.
added streams-components module
added streams-processor-http module
activated streams-pojo-extensions module
tweaks to pojo, require id and remove non-sensical defaults
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34232ad8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34232ad8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34232ad8
Branch: refs/heads/STREAMS-168
Commit: 34232ad87b50253913fba0dabb6fa9af591f388d
Parents: 1464819
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 13:50:32 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 13:50:32 2014 -0500
----------------------------------------------------------------------
pom.xml | 2 +
.../streams-processor-http/README.md | 16 ++
.../streams-processor-http/pom.xml | 42 +++-
.../components/http/SimpleHTTPGetProcessor.java | 213 +++++++++++++++++++
.../HttpProcessorConfiguration.json | 47 ++++
.../api/FacebookPostActivitySerializer.java | 1 -
.../streams-provider-twitter/pom.xml | 5 +
.../processor/TwitterUrlApiProcessor.java | 44 ++++
streams-pojo-extensions/pom.xml | 64 ++++++
.../apache/streams/data/util/ExtensionUtil.java | 94 ++++++++
.../apache/streams/data/util/ActivityUtil.java | 14 +-
.../org/apache/streams/pojo/json/activity.json | 3 +-
.../org/apache/streams/pojo/json/object.json | 2 +-
13 files changed, 537 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c2e1766..59fa634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,9 @@
<module>streams-osgi-components</module>
<module>streams-core</module>
<module>streams-config</module>
+ <module>streams-components</module>
<module>streams-pojo</module>
+ <module>streams-pojo-extensions</module>
<module>streams-util</module>
<module>streams-contrib</module>
<module>streams-runtimes</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/README.md
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/README.md b/streams-components/streams-processor-http/README.md
new file mode 100644
index 0000000..62dd4c1
--- /dev/null
+++ b/streams-components/streams-processor-http/README.md
@@ -0,0 +1,16 @@
+streams-processor-http
+=====================
+
+Hit an http endpoint and place the result in extensions
+
+Example SimpleHTTPGetProcessor configuration:
+
+ "http": {
+ "protocol": "http",
+ "hostname": "urls.api.twitter.com",
+ "port": 9300,
+ "resourceUri": "1/urls/count.json"
+ }
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/pom.xml b/streams-components/streams-processor-http/pom.xml
index 67538aa..d9215ad 100644
--- a/streams-components/streams-processor-http/pom.xml
+++ b/streams-components/streams-processor-http/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.streams</groupId>
- <artifactId>streams-project</artifactId>
+ <artifactId>streams-components</artifactId>
<version>0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -40,6 +40,45 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
</dependencies>
<build>
@@ -100,6 +139,7 @@
<targetPackage>org.apache.streams.http</targetPackage>
<useLongIntegers>true</useLongIntegers>
<useJodaDates>true</useJodaDates>
+ <includeJsr303Annotations>true</includeJsr303Annotations>
</configuration>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
new file mode 100644
index 0000000..d76d839
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -0,0 +1,213 @@
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+
+ // from root config id
+ private final static String EXTENSION = "account_type";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+
+ protected ObjectMapper mapper;
+
+ protected URIBuilder uriBuilder;
+
+ protected CloseableHttpClient httpclient;
+
+ protected HttpProcessorConfiguration configuration;
+//
+// // authorized only
+// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
+// //private String authHeader;
+//
+ public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
+ LOGGER.info("creating SimpleHTTPGetProcessor");
+ LOGGER.info(processorConfiguration.toString());
+ this.configuration = processorConfiguration;
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ return Maps.newHashMap();
+ }
+
+ /**
+ Override this to store a result other than exact json representation of response
+ */
+ protected ObjectNode prepareExtensionFragment(String entityString) {
+
+ try {
+ return mapper.readValue(entityString, ObjectNode.class);
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+ try {
+ String json = datum.getDocument() instanceof String ?
+ (String) datum.getDocument() :
+ mapper.writeValueAsString(datum.getDocument());
+ return mapper.readValue(json, ObjectNode.class);
+ } catch (JsonProcessingException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
+ return null;
+ }
+
+ }
+ /**
+ Override this to place result in non-standard location on document
+ */
+ protected ObjectNode getEntityToExtend(ObjectNode rootDocument) {
+
+ if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
+ return rootDocument;
+ else
+ return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString());
+
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ ObjectNode rootDocument = getRootDocument(entry);
+
+ Map<String, String> params = prepareParams(entry);
+
+ URI uri;
+ for( Map.Entry<String,String> param : params.entrySet()) {
+ uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+ }
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("URI error {}", uriBuilder.toString());
+ return result;
+ }
+
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpget);
+ HttpEntity entity = response.getEntity();
+ // TODO: handle rate-limiting
+ if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e.getMessage());
+ return result;
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ try {
+ httpclient.close();
+ } catch (IOException e) {}
+ }
+
+ if( entityString == null )
+ return result;
+
+ LOGGER.debug(entityString);
+
+ ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+
+ ObjectNode extensionEntity = getEntityToExtend(rootDocument);
+
+ ExtensionUtil.ensureExtensions(extensionEntity);
+
+ ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment);
+
+ entry.setDocument(extensionEntity);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+ Preconditions.checkArgument(factory.getValidator().validate(this.configuration, HttpProcessorConfiguration.class).size() == 0);
+
+ mapper = StreamsJacksonMapper.getInstance();
+
+ uriBuilder = new URIBuilder()
+ .setScheme(this.configuration.getProtocol())
+ .setHost(this.configuration.getHostname())
+ .setPath(this.configuration.getResourceUri());
+
+ httpclient = HttpClients.createDefault();
+ // StringBuilder stringBuilder = new StringBuilder();
+// stringBuilder.append(peoplePatternConfiguration.getUsername());
+// stringBuilder.append(":");
+// stringBuilder.append(peoplePatternConfiguration.getPassword());
+// String string = stringBuilder.toString();
+// authHeader = Base64.encodeBase64String(string.getBytes());
+ }
+
+ @Override
+ public void cleanUp() {
+ LOGGER.info("shutting down SimpleHTTPGetProcessor");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
new file mode 100644
index 0000000..40c3bcd
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/jsonschema/org/apache/streams/elasticsearch/HttpProcessorConfiguration.json
@@ -0,0 +1,47 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.components.http.HttpProcessorConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "protocol": {
+ "type": "string",
+ "description": "Protocol",
+ "default": "http"
+ },
+ "hostname": {
+ "type": "string",
+ "description": "Hostname",
+ "required" : true
+ },
+ "port": {
+ "type": "integer",
+ "description": "Port",
+ "default": 80
+ },
+ "resourceUri": {
+ "type": "string",
+ "description": "Resource URI",
+ "required" : true
+ },
+ "content-type": {
+ "type": "string",
+ "description": "Resource URI",
+ "required" : true,
+ "default": "application/json"
+ },
+ "entity": {
+ "type": "string",
+ "description": "Entity to extend",
+ "enum": [ "activity", "actor", "object", "target" ],
+ "required" : true,
+ "default": "activity"
+ },
+ "extension": {
+ "type": "string",
+ "description": "Extension identifier",
+ "required" : true
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index aa718fb..de39262 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -78,7 +78,6 @@ public class FacebookPostActivitySerializer implements ActivitySerializer<org.ap
setProvider(activity);
setObjectType(post.getType(), activity);
parseObject(activity, mapper.convertValue(post, ObjectNode.class));
- fixObjectId(activity);
fixContentFromSummary(activity);
activity.setVerb("post");
List<String> links = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 79d1608..f0d65f8 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -54,6 +54,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-processor-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
new file mode 100644
index 0000000..77965c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -0,0 +1,44 @@
+package org.apache.streams.twitter.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 9/14/14.
+ */
+public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
+
+ public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
+ super(processorConfiguration);
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setExtension("twitter_url_count");
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ Preconditions.checkArgument(entry.getDocument() instanceof Activity);
+ Activity activity = mapper.convertValue(entry, Activity.class);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl()));
+ return super.process(entry);
+ }
+
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+
+ Map<String, String> params = Maps.newHashMap();
+
+ params.put("url", mapper.convertValue(entry, Activity.class).getUrl());
+
+ return params;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/pom.xml b/streams-pojo-extensions/pom.xml
new file mode 100644
index 0000000..7f3f1a5
--- /dev/null
+++ b/streams-pojo-extensions/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-project</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>streams-pojo-extensions</artifactId>
+
+ <name>streams-pojo-extensions</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
new file mode 100644
index 0000000..a8d068a
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java
@@ -0,0 +1,94 @@
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import java.util.Map;
+
+public class ExtensionUtil {
+
+ /**
+ * Property on the activity object to use for extensions
+ */
+ public static final String EXTENSION_PROPERTY = "extensions";
+ /**
+ * The number of +1, Like, favorites, etc that the post has received
+ */
+ public static final String LIKES_EXTENSION = "likes";
+ /**
+ * The number of retweets, shares, etc that the post has received
+ */
+ public static final String REBROADCAST_EXTENSION = "rebroadcasts";
+ /**
+ * The language of the post
+ */
+ public static final String LANGUAGE_EXTENSION = "language";
+ /**
+ * Location that the post was made or the actor's residence
+ */
+ public static final String LOCATION_EXTENSION = "location";
+ /**
+ * Country that the post was made
+ */
+ public static final String LOCATION_EXTENSION_COUNTRY = "country";
+ /**
+ * Specific JSON-geo coordinates (long,lat)
+ */
+ public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public static Map<String, Object> getExtensions(ObjectNode object) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ return extensions;
+ }
+
+ public static Object getExtension(ObjectNode object, String key) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ return extensions.get(key);
+ }
+
+ public static void setExtensions(ObjectNode object, Map<String, Object> extensions) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
+ };
+
+ public static void addExtension(ObjectNode object, String key, Object extension) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ extensions.put(key, extension);
+ };
+
+ public static void addExtensions(ObjectNode object, Map<String, Object> extensions) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ for( Map.Entry<String, Object> item : extensions.entrySet())
+ activityObject.getAdditionalProperties().put(item.getKey(), item.getValue());
+ };
+
+ public static void removeExtension(ObjectNode object, String key) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ extensions.remove(key);
+ };
+
+ /**
+ * Creates a standard extension property
+ * @param object objectnode to create the property in
+ * @return the Map representing the extensions property
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> ensureExtensions(ObjectNode object) {
+ ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class);
+ Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = Maps.newHashMap();
+ setExtensions(object, extensions);
+ }
+ return getExtensions(object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 3684b32..04ee923 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -19,6 +19,7 @@
package org.apache.streams.data.util;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
@@ -61,7 +62,7 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
/**
* Creates a standard extension property
@@ -69,13 +70,14 @@ public class ActivityUtil {
* @return the Map representing the extensions property
*/
@SuppressWarnings("unchecked")
+ @Deprecated
public static Map<String, Object> ensureExtensions(Activity activity) {
- Map<String, Object> properties = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
- if(properties == null) {
- properties = new HashMap<String, Object>();
- activity.setAdditionalProperty(EXTENSION_PROPERTY, properties);
+ Map<String, Object> extensions = (Map)activity.getAdditionalProperties().get(EXTENSION_PROPERTY);
+ if(extensions == null) {
+ extensions = new HashMap<String, Object>();
+ activity.setAdditionalProperty(EXTENSION_PROPERTY, extensions);
}
- return properties;
+ return extensions;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
index 45c2276..a68ce00 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/activity.json
@@ -8,7 +8,8 @@
"properties": {
"id" :{
"type" : "string",
- "description" : "Uniquely identifies each activity within the service"
+ "description" : "Uniquely identifies each activity within the service",
+ "required" : true
},
"actor" : {
"type": "object",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34232ad8/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
index d51db27..eec09a8 100644
--- a/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
+++ b/streams-pojo/src/main/jsonschema/org/apache/streams/pojo/json/object.json
@@ -8,7 +8,7 @@
"id" : {
"type" : "string",
"description" : "Provides a permanent, universally unique identifier for the object in the form of an absolute IRI [RFC3987]. An object SHOULD contain a single id property. If an object does not contain an id property, consumers MAY use the value of the url property as a less-reliable, non-unique identifier.",
- "default" : "{link}"
+ "required" : true
},
"image" : {
"format":"image",
[3/3] git commit: added HttpConfigurator
Posted by sb...@apache.org.
added HttpConfigurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8ccf9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8ccf9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8ccf9f6
Branch: refs/heads/STREAMS-168
Commit: b8ccf9f6869c64f1f6e25f668942ea0de9fec2eb
Parents: 34232ad
Author: sblackmon <sb...@apache.org>
Authored: Mon Sep 15 18:38:09 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Sep 15 18:38:09 2014 -0500
----------------------------------------------------------------------
.../components/http/HttpConfigurator.java | 53 ++++++++++++++++++++
.../components/http/SimpleHTTPGetProcessor.java | 23 +++++----
.../processor/TwitterUrlApiProcessor.java | 8 +++
3 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
new file mode 100644
index 0000000..36801b8
--- /dev/null
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.components.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
+ */
+public class HttpConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HttpConfigurator.class);
+
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+ public static HttpProcessorConfiguration detectConfiguration(Config config) {
+
+ HttpProcessorConfiguration httpProcessorConfiguration = null;
+
+ try {
+ httpProcessorConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), HttpProcessorConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse http configuration", e.getMessage());
+ }
+ return httpProcessorConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
index d76d839..dec9d03 100644
--- a/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
+++ b/streams-components/streams-processor-http/src/main/java/org/apache/streams/components/http/SimpleHTTPGetProcessor.java
@@ -19,6 +19,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.ActivityUtil;
@@ -34,6 +35,7 @@ import javax.validation.ValidatorFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -59,6 +61,11 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
// //private PeoplePatternConfiguration peoplePatternConfiguration = null;
// //private String authHeader;
//
+ public SimpleHTTPGetProcessor() {
+ LOGGER.info("creating SimpleHTTPGetProcessor");
+ this.configuration = HttpConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("http"));
+ }
+
public SimpleHTTPGetProcessor(HttpProcessorConfiguration processorConfiguration) {
LOGGER.info("creating SimpleHTTPGetProcessor");
LOGGER.info(processorConfiguration.toString());
@@ -137,9 +144,7 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
return result;
}
- HttpGet httpget = new HttpGet(uri);
- httpget.addHeader("content-type", this.configuration.getContentType());
- //httpget.addHeader("Authorization", String.format("Basic %s", authHeader));
+ HttpGet httpget = prepareHttpGet(uri);
CloseableHttpResponse response = null;
@@ -184,6 +189,12 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
}
+ public HttpGet prepareHttpGet(URI uri) {
+ HttpGet httpget = new HttpGet(uri);
+ httpget.addHeader("content-type", this.configuration.getContentType());
+ return httpget;
+ }
+
@Override
public void prepare(Object configurationObject) {
@@ -198,12 +209,6 @@ public abstract class SimpleHTTPGetProcessor implements StreamsProcessor {
.setPath(this.configuration.getResourceUri());
httpclient = HttpClients.createDefault();
- // StringBuilder stringBuilder = new StringBuilder();
-// stringBuilder.append(peoplePatternConfiguration.getUsername());
-// stringBuilder.append(":");
-// stringBuilder.append(peoplePatternConfiguration.getPassword());
-// String string = stringBuilder.toString();
-// authHeader = Base64.encodeBase64String(string.getBytes());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8ccf9f6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
index 77965c4..438937f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java
@@ -5,6 +5,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.streams.components.http.HttpProcessorConfiguration;
import org.apache.streams.components.http.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
@@ -17,6 +18,13 @@ import java.util.Map;
*/
public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor {
+ public TwitterUrlApiProcessor() {
+ super();
+ this.configuration.setHostname("urls.api.twitter.com");
+ this.configuration.setResourceUri("/1/urls/count.json");
+ this.configuration.setExtension("twitter_url_count");
+ }
+
public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) {
super(processorConfiguration);
this.configuration.setHostname("urls.api.twitter.com");