You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 20:11:37 UTC
[06/12] incubator-streams git commit: committing PeoplePattern
processor
committing PeoplePattern processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9a575322
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9a575322
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9a575322
Branch: refs/heads/master
Commit: 9a575322231e4a8a69c56f06da743ffe3211ccb4
Parents: d62061d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Oct 12 20:20:52 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Oct 12 20:20:52 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 5 +-
.../streams-processor-peoplepattern/pom.xml | 138 +++++++++++++++++++
.../peoplepattern/AccountTypeProcessor.java | 76 ++++++++++
.../peoplepattern/DemographicsProcessor.java | 77 +++++++++++
.../streams/peoplepattern/AccountType.json | 27 ++++
.../streams/peoplepattern/Demographics.json | 60 ++++++++
.../resources/templates/peoplepatternactor.json | 25 ++++
7 files changed, 406 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index e290466..fcec297 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,17 +44,18 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
- <module>streams-amazon-aws</module>
+ <module>streams-amazon-aws</module>
<!--<module>streams-processor-lucene</module>-->
<!--<module>streams-processor-tika</module>-->
- <module>streams-provider-instagram</module>
<module>streams-processor-jackson</module>
<module>streams-processor-json</module>
<module>streams-processor-urls</module>
+ <module>streams-processor-peoplepattern</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
<module>streams-provider-google</module>
<module>streams-provider-gnip</module>
+ <module>streams-provider-instagram</module>
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
<module>streams-provider-sysomos</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/pom.xml b/streams-contrib/streams-processor-peoplepattern/pom.xml
new file mode 100644
index 0000000..b810200
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>streams-processor-peoplepattern</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo/**/*.java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-source-jaxb2</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jaxb2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/org/apache/streams/peoplepattern</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.peoplepattern</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
new file mode 100644
index 0000000..d180b7f
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
+
+ private final static String STREAMS_ID = "AccountTypeProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class);
+
+ public AccountTypeProcessor() {
+ this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+ }
+
+ public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+ super(peoplePatternConfiguration);
+ LOGGER.info("creating AccountTypeProcessor");
+ configuration.setProtocol("https");
+ configuration.setHostname("api.peoplepattern.com");
+ configuration.setResourcePath("/v0.2/account_type/");
+ configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+ configuration.setExtension("account_type");
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+ Actor actor = activity.getActor();
+ ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+ String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("id", actor.getId());
+ params.put("name", actor.getDisplayName());
+ params.put("username", username);
+ params.put("description", actor.getSummary());
+ return params;
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
new file mode 100644
index 0000000..6ffbb9b
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.peoplepattern;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.components.http.HttpConfigurator;
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Enrich actor with demographics
+ */
+public class DemographicsProcessor extends SimpleHTTPGetProcessor {
+
+ public final static String STREAMS_ID = "DemographicsProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class);
+
+ public DemographicsProcessor() {
+ this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+ }
+
+ public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) {
+ super(peoplePatternConfiguration);
+ LOGGER.info("creating DemographicsProcessor");
+ configuration.setProtocol("https");
+ configuration.setHostname("api.peoplepattern.com");
+ configuration.setResourcePath("/v0.2/demographics/");
+ configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+ configuration.setExtension("demographics");
+ }
+
+ /**
+ Override this to add parameters to the request
+ */
+ @Override
+ protected Map<String, String> prepareParams(StreamsDatum entry) {
+ Activity activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class);
+ Actor actor = activity.getActor();
+ ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class);
+ String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName");
+ Map<String, String> params = Maps.newHashMap();
+ params.put("id", actor.getId());
+ params.put("name", actor.getDisplayName());
+ params.put("username", username);
+ params.put("description", actor.getSummary());
+ return params;
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
new file mode 100644
index 0000000..5656b44
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/AccountType.json
@@ -0,0 +1,27 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.AccountType",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "prediction" : {
+ "type" : "string",
+ "enum" : [
+ "person",
+ "organization",
+ "entertainment",
+ "adult",
+ "spam",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ },
+ "id": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
new file mode 100644
index 0000000..d1f64d8
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/jsonschema/org/apache/streams/peoplepattern/Demographics.json
@@ -0,0 +1,60 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType": "org.apache.streams.peoplepattern.Demographics",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "age": {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "integer",
+ "default": 1990
+ },
+ "score": {
+ "type": "number"
+ }
+
+ }
+ },
+ "gender" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "male",
+ "female",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ },
+ "race" : {
+ "type": "object",
+ "properties": {
+ "prediction": {
+ "type": "string",
+ "enum": [
+ "black",
+ "east-asian",
+ "hispanic",
+ "middle-eastern",
+ "south-asian",
+ "white",
+ "no-prediction"
+ ],
+ "default": "no-prediction"
+ },
+ "score": {
+ "type": "number"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9a575322/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
new file mode 100644
index 0000000..9a24c5c
--- /dev/null
+++ b/streams-contrib/streams-processor-peoplepattern/src/main/resources/templates/peoplepatternactor.json
@@ -0,0 +1,25 @@
+{
+ "order": 20,
+ "template": "*activity*",
+ "settings": {},
+ "mappings": {
+ "activity": {
+ "properties": {
+ "actor": {
+ "properties": {
+ "extensions": {
+ "properties": {
+ "account_type": {
+ "type": "nested"
+ },
+ "demographics": {
+ "type": "nested"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file