You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 21:12:04 UTC

[19/28] incubator-streams git commit: basic implementation, confirmed working (although errors not uncommon) using https://github.com/w2ogroup/streams-examples.git/twitter-userstream-neo4j (STREAMS-231 branch)

basic implementation, confirmed working (although errors not uncommon) using https://github.com/w2ogroup/streams-examples.git/twitter-userstream-neo4j (STREAMS-231 branch)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d471f34d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d471f34d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d471f34d

Branch: refs/heads/master
Commit: d471f34dc2a251963efec8a4130df0608ed70b4f
Parents: 6c32ce2
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sat Nov 22 21:00:42 2014 -0600
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sat Nov 22 21:01:12 2014 -0600

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-persist-graph/README.md |  44 +++++
 streams-contrib/streams-persist-graph/pom.xml   |  96 +++++++++
 .../streams/graph/GraphPersistWriter.java       | 193 +++++++++++++++++++
 .../streams/graph/neo4j/CypherGraphUtil.java    | 147 ++++++++++++++
 .../streams/graph/GraphConfiguration.json       |  19 ++
 .../graph/GraphEdgeWriterConfiguration.json     |  30 +++
 .../graph/GraphVertexWriterConfiguration.json   |  30 +++
 .../serializer/util/TwitterActivityUtil.java    |   7 +-
 .../apache/streams/data/util/PropertyUtil.java  |  89 +++++++++
 10 files changed, 655 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index fcec297..cc45923 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
         <module>streams-persist-cassandra</module>
         <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
+        <module>streams-persist-graph</module>
         <module>streams-persist-hbase</module>
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/README.md b/streams-contrib/streams-persist-graph/README.md
new file mode 100644
index 0000000..086f5b5
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/README.md
@@ -0,0 +1,44 @@
+streams-persist-graph
+=====================
+
+Build graph index of stream
+
+Example Neo4J configuration:
+
+    {
+        "graph": {
+            "type": "neo4j",
+            "protocol": "http",
+            "hostname": "localhost",
+            "port": 7474,
+            "graph": "data"
+            "vertices": {
+                "verbs": [
+                    "post",
+                    "share",
+                    "tweet"
+                ],
+                "objectType": "page"
+            }
+        },
+    }
+
+Example Rexster configuration:
+
+    {
+        "graph": {
+            "type": "rexster",
+            "protocol": "http",
+            "hostname": "localhost",
+            "port": 8182,
+            "graph": "data",
+            "vertices": {
+                "verbs": [
+                    "post",
+                    "share",
+                    "tweet"
+                ],
+                "objectType": "page"
+            }
+        },
+    }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
new file mode 100644
index 0000000..86458a5
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-graph</artifactId>
+    <name>streams-persist-graph</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.graph.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
new file mode 100644
index 0000000..352bc68
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.neo4j.CypherGraphUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
+
+    public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    protected GraphWriterConfiguration configuration;
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+    private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public GraphPersistWriter() {
+        this(GraphConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("blueprints")));
+    }
+
+    public GraphPersistWriter(GraphWriterConfiguration configuration) {
+        super((HttpPersistWriterConfiguration)configuration);
+        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
+            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+        else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER))
+            super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected ObjectNode preparePayload(StreamsDatum entry) {
+
+        Activity activity = null;
+
+        if (entry.getDocument() instanceof Activity) {
+            activity = (Activity) entry.getDocument();
+        } else if (entry.getDocument() instanceof ObjectNode) {
+            activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        } else if (entry.getDocument() instanceof String) {
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+            }
+        }
+
+        Preconditions.checkNotNull(activity);
+
+        ObjectNode request = mapper.createObjectNode();
+        ArrayNode statements = mapper.createArrayNode();
+
+        activity.getActor().setObjectType("page");
+
+        // always add vertices first
+        // what types of verbs are relevant for adding vertices?
+        if( configuration.getVertices().getVerbs().contains(activity.getVerb())) {
+
+            // what objects and objectTypes are relevant for adding vertices?
+            if( configuration.getVertices().getObjects().contains("actor") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) {
+                statements.add(CypherGraphUtil.mergeVertexRequest(activity.getActor()));
+            }
+            if( configuration.getVertices().getObjects().contains("object") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) {
+                statements.add(CypherGraphUtil.mergeVertexRequest(activity.getObject()));
+            }
+            if( configuration.getVertices().getObjects().contains("provider") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) {
+                statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
+            }
+            if( configuration.getVertices().getObjects().contains("target") &&
+                configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+                statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
+            }
+
+        }
+
+        // what types of verbs are relevant for adding edges?
+        if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
+
+            // what objects and objectTypes are relevant for adding edges?
+            if( configuration.getEdges().getObjects().contains("actor") &&
+                configuration.getEdges().getObjects().contains("object") &&
+                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+                configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) {
+                statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getObject()));
+            }
+            if( configuration.getEdges().getObjects().contains("actor") &&
+                    configuration.getEdges().getObjects().contains("target") &&
+                    configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
+                    configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) {
+                statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getTarget()));
+            }
+            if( configuration.getEdges().getObjects().contains("provider") &&
+                configuration.getEdges().getObjects().contains("actor") &&
+                configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) &&
+                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) {
+                statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getProvider(), activity.getActor()));
+            }
+        }
+
+        request.put("statements", statements);
+        return request;
+
+    }
+
+    @Override
+    protected ObjectNode executePost(HttpPost httpPost) {
+
+        Preconditions.checkNotNull(httpPost);
+
+        ObjectNode result = null;
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpPost);
+            HttpEntity entity = response.getEntity();
+            if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+                result = mapper.readValue(entityString, ObjectNode.class);
+            }
+            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+        } finally {
+            try {
+                response.close();
+            } catch (IOException e) {}
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        super.prepare(configurationObject);
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("exiting");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
new file mode 100644
index 0000000..92ee12f
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
@@ -0,0 +1,147 @@
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 11/13/14.
+ */
+public class CypherGraphUtil {
+
+    private final static ObjectMapper mapper = new StreamsJacksonMapper();
+
+    public final static String statementKey = "statement";
+    public final static String paramsKey = "parameters";
+    public final static String propsKey = "props";
+
+    public final static String getVertexStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+
+    public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+                                                                "CREATE UNIQUE (n:<type> { props }) "+
+                                                                "RETURN n";
+
+    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
+                                                               "ON CREATE SET v:<type>";
+
+    public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
+                                                            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+                                                            "RETURN r";
+
+    public static ObjectNode getVertexRequest(String id) {
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST getVertex = new ST(getVertexStatementTemplate);
+        getVertex.add("id", id);
+        request.put(statementKey, getVertex.render());
+
+        return request;
+    }
+
+    public static ObjectNode createVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST createVertex = new ST(createVertexStatementTemplate);
+        createVertex.add("id", activityObject.getId());
+        createVertex.add("type", activityObject.getObjectType());
+        request.put(statementKey, createVertex.render());
+
+        ObjectNode params = mapper.createObjectNode();
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '_');
+        params.put(propsKey, props);
+        request.put(paramsKey, params);
+
+        return request;
+    }
+
+    public static ObjectNode mergeVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ObjectNode request = mapper.createObjectNode();
+
+        ST mergeVertex = new ST(mergeVertexStatementTemplate);
+        mergeVertex.add("id", activityObject.getId());
+        mergeVertex.add("type", activityObject.getObjectType());
+
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
+
+        String statement = mergeVertex.render();
+        statement += getPropertySetter(props, "v");
+        statement += (" RETURN v;");
+        request.put(statementKey, statement);
+
+        return request;
+    }
+
+    public static ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) {
+
+        ObjectNode request = mapper.createObjectNode();
+
+        // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
+        activity.setActor(null);
+        activity.setObject(null);
+        activity.setTarget(null);
+        activity.getAdditionalProperties().put("extensions", null);
+
+        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
+
+        ST mergeEdge = new ST(createEdgeStatementTemplate);
+        mergeEdge.add("s_id", source.getId());
+        mergeEdge.add("s_type", source.getObjectType());
+        mergeEdge.add("d_id", destination.getId());
+        mergeEdge.add("d_type", destination.getObjectType());
+        mergeEdge.add("r_id", activity.getId());
+        mergeEdge.add("r_type", activity.getVerb());
+        mergeEdge.add("r_props", getPropertyCreater(props));
+
+        String statement = mergeEdge.render();
+        request.put(statementKey, statement);
+
+        return request;
+    }
+
+    public static String getPropertySetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + "." + entry.getKey() + " = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyCreater(Map<String, Object> map) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{");
+        List<String> parts = Lists.newArrayList();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String) (entry.getValue());
+                parts.add(entry.getKey() + ":'" + propVal + "'");
+            }
+        }
+        builder.append(Joiner.on(",").join(parts));
+        builder.append("}");
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
new file mode 100644
index 0000000..1e1fac4
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
@@ -0,0 +1,19 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphConfiguration",
+    "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpPersistWriterConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "type": {
+            "type": "string",
+            "description": "Graph DB type",
+            "enum" : ["neo4j", "rexster"]
+        },
+        "graph": {
+            "type": "string",
+            "description": "Graph DB Graph ID"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
new file mode 100644
index 0000000..f9e3868
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
@@ -0,0 +1,30 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "objects": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "verbs": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "objectTypes": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
new file mode 100644
index 0000000..798f4f6
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
@@ -0,0 +1,30 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "objects": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "verbs": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        },
+        "objectTypes": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
index 56b7005..3407da7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
@@ -87,6 +87,7 @@ public class TwitterActivityUtil {
     public static void updateActivity(User user, Activity activity) throws ActivitySerializerException {
         activity.setActor(buildActor(user));
         activity.setId(null);
+        activity.setVerb(null);
     }
 
     /**
@@ -114,6 +115,7 @@ public class TwitterActivityUtil {
     public static  Actor buildActor(Delete delete) {
         Actor actor = new Actor();
         actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+        actor.setObjectType("page");
         return actor;
     }
 
@@ -161,7 +163,7 @@ public class TwitterActivityUtil {
                 .orNull();
         if( id != null )
             actObj.setId(id);
-        actObj.setObjectType("tweet");
+        actObj.setObjectType("post");
         actObj.setContent(tweet.getText());
         return actObj;
     }
@@ -191,6 +193,7 @@ public class TwitterActivityUtil {
                         .or(Optional.of(user.getId().toString()))
                         .orNull()
         ));
+        actor.setObjectType("page");
 
         actor.setDisplayName(user.getName());
         actor.setAdditionalProperty("handle", user.getScreenName());
@@ -267,7 +270,9 @@ public class TwitterActivityUtil {
     public static Provider getProvider() {
         Provider provider = new Provider();
         provider.setId("id:providers:twitter");
+        provider.setObjectType("application");
         provider.setDisplayName("Twitter");
+
         return provider;
     }
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
new file mode 100644
index 0000000..dbdff3b
--- /dev/null
+++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ *  Class transforms nested properties of activities, actors, objects, etc...
+ */
+public class PropertyUtil {
+
+    /**
+     * Property on the activity object to use for extensions
+     */
+    public static final String EXTENSION_PROPERTY = "extensions";
+
+    private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public static Map<String, Object> flattenToMap(ObjectNode object) {
+        Map<String, Object> flatObject = Maps.newHashMap();
+        addKeys(new String(), object, flatObject, '.');
+        return flatObject;
+    }
+
+    public static ObjectNode flattenToObjectNode(ObjectNode object) {
+        Map<String, Object> flatObject = flattenToMap(object, '.');
+        addKeys(new String(), object, flatObject, '.');
+        return mapper.convertValue(flatObject, ObjectNode.class);
+    }
+
+    public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) {
+        Map<String, Object> flatObject = Maps.newHashMap();
+        addKeys(new String(), object, flatObject, seperator);
+        return flatObject;
+    }
+
+    public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) {
+        Map<String, Object> flatObject = flattenToMap(object, seperator);
+        addKeys(new String(), object, flatObject, seperator);
+        return mapper.convertValue(flatObject, ObjectNode.class);
+    }
+
+    private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) {
+        if (jsonNode.isObject()) {
+            ObjectNode objectNode = (ObjectNode) jsonNode;
+            Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
+            String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
+
+            while (iter.hasNext()) {
+                Map.Entry<String, JsonNode> entry = iter.next();
+                addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
+            }
+        } else if (jsonNode.isArray()) {
+            ArrayNode arrayNode = (ArrayNode) jsonNode;
+            map.put(currentPath, arrayNode);
+        } else if (jsonNode.isValueNode()) {
+            ValueNode valueNode = (ValueNode) jsonNode;
+            if( valueNode.isTextual() )
+                map.put(currentPath, valueNode.asText());
+            else if ( valueNode.isNumber() )
+                map.put(currentPath, valueNode);
+        }
+    }
+}