You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 21:12:04 UTC
[19/28] incubator-streams git commit: basic implementation,
confirmed working (although errors not uncommon) using
https://github.com/w2ogroup/streams-examples.git/twitter-userstream-neo4j
(STREAMS-231 branch)
basic implementation, confirmed working (although errors not uncommon) using https://github.com/w2ogroup/streams-examples.git/twitter-userstream-neo4j (STREAMS-231 branch)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d471f34d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d471f34d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d471f34d
Branch: refs/heads/master
Commit: d471f34dc2a251963efec8a4130df0608ed70b4f
Parents: 6c32ce2
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sat Nov 22 21:00:42 2014 -0600
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sat Nov 22 21:01:12 2014 -0600
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
streams-contrib/streams-persist-graph/README.md | 44 +++++
streams-contrib/streams-persist-graph/pom.xml | 96 +++++++++
.../streams/graph/GraphPersistWriter.java | 193 +++++++++++++++++++
.../streams/graph/neo4j/CypherGraphUtil.java | 147 ++++++++++++++
.../streams/graph/GraphConfiguration.json | 19 ++
.../graph/GraphEdgeWriterConfiguration.json | 30 +++
.../graph/GraphVertexWriterConfiguration.json | 30 +++
.../serializer/util/TwitterActivityUtil.java | 7 +-
.../apache/streams/data/util/PropertyUtil.java | 89 +++++++++
10 files changed, 655 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index fcec297..cc45923 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
<module>streams-persist-cassandra</module>
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
+ <module>streams-persist-graph</module>
<module>streams-persist-hbase</module>
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/README.md b/streams-contrib/streams-persist-graph/README.md
new file mode 100644
index 0000000..086f5b5
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/README.md
@@ -0,0 +1,44 @@
+streams-persist-graph
+=====================
+
+Build graph index of stream
+
+Example Neo4J configuration:
+
+ {
+ "graph": {
+ "type": "neo4j",
+ "protocol": "http",
+ "hostname": "localhost",
+ "port": 7474,
+ "graph": "data"
+ "vertices": {
+ "verbs": [
+ "post",
+ "share",
+ "tweet"
+ ],
+ "objectType": "page"
+ }
+ },
+ }
+
+Example Rexster configuration:
+
+ {
+ "graph": {
+ "type": "rexster",
+ "protocol": "http",
+ "hostname": "localhost",
+ "port": 8182,
+ "graph": "data",
+ "vertices": {
+ "verbs": [
+ "post",
+ "share",
+ "tweet"
+ ],
+ "objectType": "page"
+ }
+ },
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
new file mode 100644
index 0000000..86458a5
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-graph</artifactId>
+ <name>streams-persist-graph</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>stringtemplate</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.graph.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
new file mode 100644
index 0000000..352bc68
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.neo4j.CypherGraphUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
+
+ public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName();
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class);
+ private final static long MAX_WRITE_LATENCY = 1000;
+
+ protected GraphWriterConfiguration configuration;
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+ private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public GraphPersistWriter() {
+ this(GraphConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("blueprints")));
+ }
+
+ public GraphPersistWriter(GraphWriterConfiguration configuration) {
+ super((HttpPersistWriterConfiguration)configuration);
+ if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
+ super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+ else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER))
+ super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+ this.configuration = configuration;
+ }
+
+ @Override
+ protected ObjectNode preparePayload(StreamsDatum entry) {
+
+ Activity activity = null;
+
+ if (entry.getDocument() instanceof Activity) {
+ activity = (Activity) entry.getDocument();
+ } else if (entry.getDocument() instanceof ObjectNode) {
+ activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ } else if (entry.getDocument() instanceof String) {
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Throwable e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+
+ Preconditions.checkNotNull(activity);
+
+ ObjectNode request = mapper.createObjectNode();
+ ArrayNode statements = mapper.createArrayNode();
+
+ activity.getActor().setObjectType("page");
+
+ // always add vertices first
+ // what types of verbs are relevant for adding vertices?
+ if( configuration.getVertices().getVerbs().contains(activity.getVerb())) {
+
+ // what objects and objectTypes are relevant for adding vertices?
+ if( configuration.getVertices().getObjects().contains("actor") &&
+ configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) {
+ statements.add(CypherGraphUtil.mergeVertexRequest(activity.getActor()));
+ }
+ if( configuration.getVertices().getObjects().contains("object") &&
+ configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) {
+ statements.add(CypherGraphUtil.mergeVertexRequest(activity.getObject()));
+ }
+ if( configuration.getVertices().getObjects().contains("provider") &&
+ configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) {
+ statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
+ }
+ if( configuration.getVertices().getObjects().contains("target") &&
+ configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+ statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
+ }
+
+ }
+
+ // what types of verbs are relevant for adding edges?
+ if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
+
+ // what objects and objectTypes are relevant for adding edges?
+ if( configuration.getEdges().getObjects().contains("actor") &&
+ configuration.getEdges().getObjects().contains("object") &&
+ configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+ configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) {
+ statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getObject()));
+ }
+ if( configuration.getEdges().getObjects().contains("actor") &&
+ configuration.getEdges().getObjects().contains("target") &&
+ configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+ configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+ statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getTarget()));
+ }
+ if( configuration.getEdges().getObjects().contains("provider") &&
+ configuration.getEdges().getObjects().contains("actor") &&
+ configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) &&
+ configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) {
+ statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getProvider(), activity.getActor()));
+ }
+ }
+
+ request.put("statements", statements);
+ return request;
+
+ }
+
+ @Override
+ protected ObjectNode executePost(HttpPost httpPost) {
+
+ Preconditions.checkNotNull(httpPost);
+
+ ObjectNode result = null;
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpPost);
+ HttpEntity entity = response.getEntity();
+ if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ result = mapper.readValue(entityString, ObjectNode.class);
+ }
+ LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+ } finally {
+ try {
+ response.close();
+ } catch (IOException e) {}
+ }
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ super.prepare(configurationObject);
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ LOGGER.info("exiting");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
new file mode 100644
index 0000000..92ee12f
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
@@ -0,0 +1,147 @@
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 11/13/14.
+ */
+public class CypherGraphUtil {
+
+ private final static ObjectMapper mapper = new StreamsJacksonMapper();
+
+ public final static String statementKey = "statement";
+ public final static String paramsKey = "parameters";
+ public final static String propsKey = "props";
+
+ public final static String getVertexStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+
+ public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+ "CREATE UNIQUE (n:<type> { props }) "+
+ "RETURN n";
+
+ public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
+ "ON CREATE SET v:<type>";
+
+ public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
+ "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+ "RETURN r";
+
+ public static ObjectNode getVertexRequest(String id) {
+
+ ObjectNode request = mapper.createObjectNode();
+
+ ST getVertex = new ST(getVertexStatementTemplate);
+ getVertex.add("id", id);
+ request.put(statementKey, getVertex.render());
+
+ return request;
+ }
+
+ public static ObjectNode createVertexRequest(ActivityObject activityObject) {
+
+ Preconditions.checkNotNull(activityObject.getObjectType());
+
+ ObjectNode request = mapper.createObjectNode();
+
+ ST createVertex = new ST(createVertexStatementTemplate);
+ createVertex.add("id", activityObject.getId());
+ createVertex.add("type", activityObject.getObjectType());
+ request.put(statementKey, createVertex.render());
+
+ ObjectNode params = mapper.createObjectNode();
+ ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+ ObjectNode props = PropertyUtil.flattenToObjectNode(object, '_');
+ params.put(propsKey, props);
+ request.put(paramsKey, params);
+
+ return request;
+ }
+
+ public static ObjectNode mergeVertexRequest(ActivityObject activityObject) {
+
+ Preconditions.checkNotNull(activityObject.getObjectType());
+
+ ObjectNode request = mapper.createObjectNode();
+
+ ST mergeVertex = new ST(mergeVertexStatementTemplate);
+ mergeVertex.add("id", activityObject.getId());
+ mergeVertex.add("type", activityObject.getObjectType());
+
+ ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
+
+ String statement = mergeVertex.render();
+ statement += getPropertySetter(props, "v");
+ statement += (" RETURN v;");
+ request.put(statementKey, statement);
+
+ return request;
+ }
+
+ public static ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) {
+
+ ObjectNode request = mapper.createObjectNode();
+
+ // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
+ activity.setActor(null);
+ activity.setObject(null);
+ activity.setTarget(null);
+ activity.getAdditionalProperties().put("extensions", null);
+
+ ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
+
+ ST mergeEdge = new ST(createEdgeStatementTemplate);
+ mergeEdge.add("s_id", source.getId());
+ mergeEdge.add("s_type", source.getObjectType());
+ mergeEdge.add("d_id", destination.getId());
+ mergeEdge.add("d_type", destination.getObjectType());
+ mergeEdge.add("r_id", activity.getId());
+ mergeEdge.add("r_type", activity.getVerb());
+ mergeEdge.add("r_props", getPropertyCreater(props));
+
+ String statement = mergeEdge.render();
+ request.put(statementKey, statement);
+
+ return request;
+ }
+
+ public static String getPropertySetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + "." + entry.getKey() + " = '" + propVal + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ public static String getPropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ List<String> parts = Lists.newArrayList();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ parts.add(entry.getKey() + ":'" + propVal + "'");
+ }
+ }
+ builder.append(Joiner.on(",").join(parts));
+ builder.append("}");
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
new file mode 100644
index 0000000..1e1fac4
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
@@ -0,0 +1,19 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.graph.GraphConfiguration",
+ "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpPersistWriterConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "type": {
+ "type": "string",
+ "description": "Graph DB type",
+ "enum" : ["neo4j", "rexster"]
+ },
+ "graph": {
+ "type": "string",
+ "description": "Graph DB Graph ID"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
new file mode 100644
index 0000000..f9e3868
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "objects": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ },
+ "verbs": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ },
+ "objectTypes": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
new file mode 100644
index 0000000..798f4f6
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "objects": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ },
+ "verbs": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ },
+ "objectTypes": {
+ "type": "array",
+ "required": false,
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
index 56b7005..3407da7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
@@ -87,6 +87,7 @@ public class TwitterActivityUtil {
public static void updateActivity(User user, Activity activity) throws ActivitySerializerException {
activity.setActor(buildActor(user));
activity.setId(null);
+ activity.setVerb(null);
}
/**
@@ -114,6 +115,7 @@ public class TwitterActivityUtil {
public static Actor buildActor(Delete delete) {
Actor actor = new Actor();
actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+ actor.setObjectType("page");
return actor;
}
@@ -161,7 +163,7 @@ public class TwitterActivityUtil {
.orNull();
if( id != null )
actObj.setId(id);
- actObj.setObjectType("tweet");
+ actObj.setObjectType("post");
actObj.setContent(tweet.getText());
return actObj;
}
@@ -191,6 +193,7 @@ public class TwitterActivityUtil {
.or(Optional.of(user.getId().toString()))
.orNull()
));
+ actor.setObjectType("page");
actor.setDisplayName(user.getName());
actor.setAdditionalProperty("handle", user.getScreenName());
@@ -267,7 +270,9 @@ public class TwitterActivityUtil {
public static Provider getProvider() {
Provider provider = new Provider();
provider.setId("id:providers:twitter");
+ provider.setObjectType("application");
provider.setDisplayName("Twitter");
+
return provider;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
new file mode 100644
index 0000000..dbdff3b
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Class transforms nested properties of activities, actors, objects, etc...
+ */
+public class PropertyUtil {
+
+ /**
+ * Property on the activity object to use for extensions
+ */
+ public static final String EXTENSION_PROPERTY = "extensions";
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public static Map<String, Object> flattenToMap(ObjectNode object) {
+ Map<String, Object> flatObject = Maps.newHashMap();
+ addKeys(new String(), object, flatObject, '.');
+ return flatObject;
+ }
+
+ public static ObjectNode flattenToObjectNode(ObjectNode object) {
+ Map<String, Object> flatObject = flattenToMap(object, '.');
+ addKeys(new String(), object, flatObject, '.');
+ return mapper.convertValue(flatObject, ObjectNode.class);
+ }
+
+ public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) {
+ Map<String, Object> flatObject = Maps.newHashMap();
+ addKeys(new String(), object, flatObject, seperator);
+ return flatObject;
+ }
+
+ public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) {
+ Map<String, Object> flatObject = flattenToMap(object, seperator);
+ addKeys(new String(), object, flatObject, seperator);
+ return mapper.convertValue(flatObject, ObjectNode.class);
+ }
+
+ private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) {
+ if (jsonNode.isObject()) {
+ ObjectNode objectNode = (ObjectNode) jsonNode;
+ Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
+ String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
+
+ while (iter.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iter.next();
+ addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
+ }
+ } else if (jsonNode.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) jsonNode;
+ map.put(currentPath, arrayNode);
+ } else if (jsonNode.isValueNode()) {
+ ValueNode valueNode = (ValueNode) jsonNode;
+ if( valueNode.isTextual() )
+ map.put(currentPath, valueNode.asText());
+ else if ( valueNode.isNumber() )
+ map.put(currentPath, valueNode);
+ }
+ }
+}