You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/05/01 00:03:10 UTC

incubator-streams git commit: resolves STREAMS-313 #313 neo4j runtime dependencies are scope:provided and optional using the existing Http-based writer does not require neo4j libraries using the Binary writer does, but it's up to the implementing stream

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-313 [created] cb1aaea3e


resolves STREAMS-313 #313
neo4j runtime dependencies are scope:provided and optional
using the existing Http-based writer does not require neo4j libraries
using the Binary writer does, but it's up to the implementing stream to import it


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cb1aaea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cb1aaea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cb1aaea3

Branch: refs/heads/STREAMS-313
Commit: cb1aaea3e127cf4b9b72dd110883765450c74976
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 30 17:03:05 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 30 17:03:05 2015 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-graph/pom.xml   |  97 +++++++++
 .../streams/graph/GraphHttpPersistWriter.java   | 205 ++++++++++++++++++
 .../streams/graph/GraphPersistWriter.java       | 216 -------------------
 .../apache/streams/graph/GraphVertexReader.java |   4 +-
 .../apache/streams/graph/HttpGraphHelper.java   |  36 ++++
 .../apache/streams/graph/QueryGraphHelper.java  |  45 ++++
 .../streams/graph/neo4j/BinaryGraphHelper.java  | 109 ++++++++++
 .../graph/neo4j/CypherQueryGraphHelper.java     | 183 ++++++++++++++++
 .../neo4j/Neo4jBinaryGraphPersistWriter.java    | 200 +++++++++++++++++
 .../graph/neo4j/Neo4jBinaryGraphUtil.java       |  95 ++++++++
 .../graph/neo4j/Neo4jHttpGraphHelper.java       |  67 ++++++
 .../streams/graph/GraphBinaryConfiguration.json |  28 +++
 .../graph/GraphEdgeWriterConfiguration.json     |  33 ---
 .../streams/graph/GraphHttpConfiguration.json   |  22 ++
 .../streams/graph/GraphReaderConfiguration.json |   2 +-
 .../graph/GraphVertexWriterConfiguration.json   |  33 ---
 .../streams/graph/GraphWriterConfiguration.json |  21 --
 .../graph/test/Neo4jHttpPersistWriterIT.java    | 102 +++++++++
 .../test/TestNeo4jBinaryPersistWriter.java      | 162 ++++++++++++++
 .../graph/test/TestNeo4jVertexReader.java       |   4 +-
 20 files changed, 1356 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
index a7b5c1d..edbae7e 100644
--- a/streams-contrib/streams-persist-graph/pom.xml
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -50,6 +50,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -63,6 +69,48 @@
             <artifactId>stringtemplate</artifactId>
             <version>4.0.2</version>
         </dependency>
+        <dependency>
+            <groupId>org.javatuples</groupId>
+            <artifactId>javatuples</artifactId>
+            <version>1.2</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.neo4j</groupId>
+            <artifactId>neo4j-kernel</artifactId>
+            <version>2.2.1</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j</groupId>
+            <artifactId>neo4j-lucene-index</artifactId>
+            <version>2.2.1</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j</groupId>
+            <artifactId>neo4j-kernel</artifactId>
+            <version>2.2.1</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j</groupId>
+            <artifactId>neo4j-io</artifactId>
+            <version>2.2.1</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
     <build>
         <sourceDirectory>src/main/java</sourceDirectory>
@@ -118,6 +166,55 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <id>resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeArtifactIds>streams-pojo</includeArtifactIds>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <!-- revisit using this if streams bumps to jdk8 -->
+            <!--<plugin>-->
+                <!--<groupId>com.github.harti2006</groupId>-->
+                <!--<artifactId>neo4j-server-maven-plugin</artifactId>-->
+                <!--<version>0.1</version>-->
+                <!--<configuration>-->
+                    <!--<port>7474</port>-->
+                    <!--<version>2.2.1</version>-->
+                <!--</configuration>-->
+                <!--<executions>-->
+                    <!--<execution>-->
+                        <!--<id>start-neo4j-server</id>-->
+                        <!--<phase>-->
+                            <!--pre-integration-test-->
+                        <!--</phase>-->
+                        <!--<goals>-->
+                            <!--<goal>start</goal>-->
+                        <!--</goals>-->
+                    <!--</execution>-->
+                    <!--<execution>-->
+                        <!--<id>stop-neo4j-server</id>-->
+                        <!--<phase>-->
+                            <!--post-integration-test-->
+                        <!--</phase>-->
+                        <!--<goals>-->
+                            <!--<goal>stop</goal>-->
+                        <!--</goals>-->
+                    <!--</execution>-->
+                <!--</executions>-->
+            <!--</plugin>-->
         </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
new file mode 100644
index 0000000..71e1648
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database with
+ * an http rest endpoint (such as neo4j)
+ */
+public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
+
+    public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    protected GraphHttpConfiguration configuration;
+
+    protected QueryGraphHelper queryGraphHelper;
+    protected HttpGraphHelper httpGraphHelper;
+
+    private static ObjectMapper mapper;
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public GraphHttpPersistWriter() {
+        this(new ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    }
+
+    public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
+        super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
+        if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+        }
+        else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
+            super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+        }
+        this.configuration = configuration;
+    }
+
+    @Override
+    protected ObjectNode preparePayload(StreamsDatum entry) {
+
+        Activity activity = null;
+
+        if (entry.getDocument() instanceof Activity) {
+            activity = (Activity) entry.getDocument();
+        } else if (entry.getDocument() instanceof ObjectNode) {
+            activity = mapper.convertValue(entry.getDocument(), Activity.class);
+        } else if (entry.getDocument() instanceof String) {
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+            }
+        }
+
+        Preconditions.checkNotNull(activity);
+
+        ObjectNode request = mapper.createObjectNode();
+        ArrayNode statements = mapper.createArrayNode();
+
+        activity.getActor().setObjectType("page");
+
+        // always add vertices first
+
+        List<String> labels = Lists.newArrayList();
+        if( activity.getProvider() != null &&
+                !Strings.isNullOrEmpty(activity.getProvider().getId()) ) {
+            labels.add(activity.getProvider().getId());
+        }
+
+        if( activity.getActor() != null &&
+                !Strings.isNullOrEmpty(activity.getActor().getId()) ) {
+            if( activity.getActor().getObjectType() != null )
+                labels.add(activity.getActor().getObjectType());
+            statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activity.getActor())));
+        }
+
+        if( activity.getObject() != null &&
+                !Strings.isNullOrEmpty(activity.getObject().getId()) ) {
+            if( activity.getObject().getObjectType() != null )
+                labels.add(activity.getObject().getObjectType());
+            statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activity.getObject())));
+        }
+
+        // then add edge
+
+        if( !Strings.isNullOrEmpty(activity.getVerb()) ) {
+            statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
+        }
+
+
+
+        request.put("statements", statements);
+        return request;
+
+    }
+
+    @Override
+    protected ObjectNode executePost(HttpPost httpPost) {
+
+        Preconditions.checkNotNull(httpPost);
+
+        ObjectNode result = null;
+
+        CloseableHttpResponse response = null;
+
+        String entityString = null;
+        try {
+            response = httpclient.execute(httpPost);
+            HttpEntity entity = response.getEntity();
+            if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+                entityString = EntityUtils.toString(entity);
+                result = mapper.readValue(entityString, ObjectNode.class);
+            }
+            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+            if( result == null ||
+                    (
+                        result.get("errors") != null &&
+                        result.get("errors").isArray() &&
+                        result.get("errors").iterator().hasNext()
+                    )
+                ) {
+                LOGGER.error("Write Error: " + result.get("errors"));
+            } else {
+                LOGGER.info("Write Success");
+            }
+        } catch (IOException e) {
+            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+        } finally {
+            try {
+                if( response != null) response.close();
+            } catch (IOException e) {}
+        }
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        super.prepare(configurationObject);
+        mapper = StreamsJacksonMapper.getInstance();
+
+        if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+            queryGraphHelper = new CypherQueryGraphHelper();
+            httpGraphHelper = new Neo4jHttpGraphHelper();
+        }
+
+        Preconditions.checkNotNull(queryGraphHelper);
+        Preconditions.checkNotNull(httpGraphHelper);
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("exiting");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
deleted file mode 100644
index 828319f..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Adds activityobjects as vertices and activities as edges to a graph database with
- * an http rest endpoint (such as neo4j)
- */
-public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
-
-    public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName();
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class);
-    private final static long MAX_WRITE_LATENCY = 1000;
-
-    protected GraphWriterConfiguration configuration;
-
-    protected GraphHelper graphHelper;
-
-    private static ObjectMapper mapper;
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public GraphPersistWriter() {
-        this(new ComponentConfigurator<GraphWriterConfiguration>(GraphWriterConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-    }
-
-    public GraphPersistWriter(GraphWriterConfiguration configuration) {
-        super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
-        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
-            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
-        }
-        else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER)) {
-            super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
-        }
-        this.configuration = configuration;
-    }
-
-    @Override
-    protected ObjectNode preparePayload(StreamsDatum entry) {
-
-        Activity activity = null;
-
-        if (entry.getDocument() instanceof Activity) {
-            activity = (Activity) entry.getDocument();
-        } else if (entry.getDocument() instanceof ObjectNode) {
-            activity = mapper.convertValue(entry.getDocument(), Activity.class);
-        } else if (entry.getDocument() instanceof String) {
-            try {
-                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
-            } catch (Throwable e) {
-                LOGGER.warn(e.getMessage());
-            }
-        }
-
-        Preconditions.checkNotNull(activity);
-
-        ObjectNode request = mapper.createObjectNode();
-        ArrayNode statements = mapper.createArrayNode();
-
-        activity.getActor().setObjectType("page");
-
-        // always add vertices first
-        // what types of verbs are relevant for adding vertices?
-        if( configuration.getVertices().getVerbs().contains(activity.getVerb())) {
-
-            // what objects and objectTypes are relevant for adding vertices?
-            if( configuration.getVertices().getObjects().contains("actor") &&
-                configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) {
-                statements.add(graphHelper.mergeVertexRequest(activity.getActor()));
-            }
-            if( configuration.getVertices().getObjects().contains("object") &&
-                configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) {
-                statements.add(graphHelper.mergeVertexRequest(activity.getObject()));
-            }
-            if( configuration.getVertices().getObjects().contains("provider") &&
-                configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) {
-                statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
-            }
-            if( configuration.getVertices().getObjects().contains("target") &&
-                configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) {
-                statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
-            }
-
-        }
-
-        // what types of verbs are relevant for adding edges?
-        if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
-
-            // what objects and objectTypes are relevant for adding edges?
-            if( configuration.getEdges().getObjects().contains("actor") &&
-                configuration.getEdges().getObjects().contains("object") &&
-                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
-                configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) {
-                statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getObject()));
-            }
-            if( configuration.getEdges().getObjects().contains("actor") &&
-                    configuration.getEdges().getObjects().contains("target") &&
-                    configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
-                    configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) {
-                statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getTarget()));
-            }
-            if( configuration.getEdges().getObjects().contains("provider") &&
-                configuration.getEdges().getObjects().contains("actor") &&
-                configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) &&
-                configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) {
-                statements.add(graphHelper.createEdgeRequest(activity, activity.getProvider(), activity.getActor()));
-            }
-        }
-
-        request.put("statements", statements);
-        return request;
-
-    }
-
-    @Override
-    protected ObjectNode executePost(HttpPost httpPost) {
-
-        Preconditions.checkNotNull(httpPost);
-
-        ObjectNode result = null;
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpPost);
-            HttpEntity entity = response.getEntity();
-            if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
-                entityString = EntityUtils.toString(entity);
-                result = mapper.readValue(entityString, ObjectNode.class);
-            }
-            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
-            if( result == null ||
-                    (
-                        result.get("errors") != null &&
-                        result.get("errors").isArray() &&
-                        result.get("errors").iterator().hasNext()
-                    )
-                ) {
-                LOGGER.error("Write Error: " + result.get("errors"));
-            } else {
-                LOGGER.info("Write Success");
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
-        } catch (Exception e) {
-            LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
-        } finally {
-            try {
-                if( response != null) response.close();
-            } catch (IOException e) {}
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        super.prepare(configurationObject);
-        mapper = StreamsJacksonMapper.getInstance();
-
-        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
-            graphHelper = new CypherGraphHelper();
-        }
-
-        Preconditions.checkNotNull(graphHelper);
-    }
-
-    @Override
-    public void cleanUp() {
-
-        LOGGER.info("exiting");
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
index 5910136..93041d8 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
@@ -58,9 +58,9 @@ public class GraphVertexReader extends SimpleHttpProvider implements StreamsPers
 
     public GraphVertexReader(GraphReaderConfiguration configuration) {
         super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
-        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
+        if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J))
             super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
-        else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER))
+        else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER))
             super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
         this.configuration = configuration;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
new file mode 100644
index 0000000..0833ba0
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+
+import java.util.Map;
+
+/**
+ * Interface for methods allowing persistance to a graph database wrapped with
+ * a rest API.  CypherGraphHelper is a good example, for neo4j.
+ */
+public interface HttpGraphHelper {
+
+    public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
new file mode 100644
index 0000000..eeacdae
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+
+import java.util.Map;
+
+/**
+ * Interface for methods allowing persistance to a graph database which uses a combination
+ * DSL
+ */
+public interface QueryGraphHelper {
+
+    public Pair<String, Map<String, Object>> getVertexRequest(String streamsId);
+
+    public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId);
+
+    public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject);
+
+    public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject);
+
+    public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
new file mode 100644
index 0000000..3dc8ffc
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class BinaryGraphHelper {
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+        Pair<String, Map<String, Object>> queryPlusParameters = new Pair(props.get("id"), props);
+
+        return queryPlusParameters;
+    }
+
+    public Quartet<String, String, String, Map<String, Object>> createEdgeRequest(Activity activity) {
+
+        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+        Quartet createEdgeRequest = new Quartet(
+                activity.getActor().getId(),
+                activity.getObject().getId(),
+                activity.getId(),
+                props);
+
+        return createEdgeRequest;
+    }
+
+    public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyCreater(Map<String, Object> map) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{");
+        List<String> parts = Lists.newArrayList();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String) (entry.getValue());
+                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
+            }
+        }
+        builder.append(Joiner.on(",").join(parts));
+        builder.append("}");
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
new file mode 100644
index 0000000..472a762
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherQueryGraphHelper implements QueryGraphHelper {
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public final static String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
+    public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+
+    public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+                                                                "CREATE UNIQUE (n:<type> { props }) "+
+                                                                "RETURN n";
+
+    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
+                                                               "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "RETURN v";
+
+    public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
+                                                            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+                                                            "RETURN r";
+
+    public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
+
+        ST getVertex = new ST(getVertexStringIdStatementTemplate);
+        getVertex.add("id", streamsId);
+
+        Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+        return queryPlusParameters;
+    }
+
+    @Override
+    public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+
+        ST getVertex = new ST(getVertexLongIdStatementTemplate);
+        getVertex.add("id", vertexId);
+
+        Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+        return queryPlusParameters;
+
+    }
+
+    public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        ST createVertex = new ST(createVertexStatementTemplate);
+        createVertex.add("id", activityObject.getId());
+        createVertex.add("type", activityObject.getObjectType());
+
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+        Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
+
+        return queryPlusParameters;
+    }
+
+    public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
+
+        Preconditions.checkNotNull(activityObject.getObjectType());
+
+        Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+
+        ST mergeVertex = new ST(mergeVertexStatementTemplate);
+        mergeVertex.add("id", activityObject.getId());
+        mergeVertex.add("type", activityObject.getObjectType());
+        queryPlusParameters.setAt0(mergeVertex.render());
+
+        ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+        queryPlusParameters.setAt1(props);
+
+        return queryPlusParameters;
+    }
+
+    public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) {
+
+        Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+
+        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+        ST mergeEdge = new ST(createEdgeStatementTemplate);
+        mergeEdge.add("s_id", activity.getActor().getId());
+        mergeEdge.add("s_type", activity.getActor().getObjectType());
+        mergeEdge.add("d_id", activity.getObject().getId());
+        mergeEdge.add("d_type", activity.getObject().getObjectType());
+        mergeEdge.add("r_id", activity.getId());
+        mergeEdge.add("r_type", activity.getVerb());
+        mergeEdge.add("r_props", getPropertyCreater(props));
+
+        // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
+        activity.setActor(null);
+        activity.setObject(null);
+        activity.setTarget(null);
+        activity.getAdditionalProperties().put("extensions", null);
+
+        String statement = mergeEdge.render();
+        queryPlusParameters.setAt0(statement);
+        queryPlusParameters.setAt1(props);
+
+        return queryPlusParameters;
+    }
+
+    public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+        StringBuilder builder = new StringBuilder();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String)(entry.getValue());
+                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+            }
+        }
+        return builder.toString();
+    }
+
+    public static String getPropertyCreater(Map<String, Object> map) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("{");
+        List<String> parts = Lists.newArrayList();
+        for( Map.Entry<String, Object> entry : map.entrySet()) {
+            if( entry.getValue() instanceof String ) {
+                String propVal = (String) (entry.getValue());
+                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
+            }
+        }
+        builder.append(Joiner.on(",").join(parts));
+        builder.append("}");
+        return builder.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
new file mode 100644
index 0000000..8ca5fff
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.graph.GraphBinaryConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Label;
+import org.neo4j.graphdb.factory.GraphDatabaseFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database file which will be
+ * loaded inside of neo4j
+ */
+public class Neo4jBinaryGraphPersistWriter implements StreamsPersistWriter {
+
+    public static final String STREAMS_ID = Neo4jBinaryGraphPersistWriter.class.getCanonicalName();
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jBinaryGraphPersistWriter.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    protected GraphBinaryConfiguration configuration;
+
+    private static ObjectMapper mapper;
+
+    public GraphDatabaseService graph;
+    private Neo4jBinaryGraphUtil graphutil;
+    private CypherQueryGraphHelper queryGraphHelper;
+    private BinaryGraphHelper binaryGraphHelper;
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public Neo4jBinaryGraphPersistWriter() {
+        this(new ComponentConfigurator<GraphBinaryConfiguration>(GraphBinaryConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    }
+
+    public Neo4jBinaryGraphPersistWriter(GraphBinaryConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        mapper = StreamsJacksonMapper.getInstance();
+
+        boolean newGraph = true;
+        if( FileUtils.getFile(configuration.getFile()).canRead())
+            newGraph = false;
+
+        graph = new GraphDatabaseFactory().newEmbeddedDatabase(configuration.getFile());
+
+        graphutil = new Neo4jBinaryGraphUtil();
+
+        queryGraphHelper = new CypherQueryGraphHelper();
+
+        binaryGraphHelper = new BinaryGraphHelper();
+
+        String globalLabel = "streams";
+
+        if( newGraph ) {
+            graphutil.addUniqueIndex(graph, globalLabel, "id", false);
+            for( String field: configuration.getIndexFields()) {
+                graphutil.addUniqueIndex(graph, globalLabel, field, false);
+            }
+        }
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+        LOGGER.info("exiting");
+
+    }
+
+    @Override
+    public void write(StreamsDatum entry) {
+
+        Activity activity = null;
+        ActivityObject activityObject = null;
+
+        if (entry.getDocument() instanceof Activity) {
+            activity = (Activity) entry.getDocument();
+        } if (entry.getDocument() instanceof ActivityObject) {
+            activityObject = (ActivityObject) entry.getDocument();
+        } else if (entry.getDocument() instanceof ObjectNode) {
+            try {
+                activity = mapper.convertValue(entry.getDocument(), Activity.class);
+            } catch( Exception e ) {
+                activityObject = mapper.convertValue(entry.getDocument(), ActivityObject.class);
+            }
+        } else if (entry.getDocument() instanceof String) {
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Throwable e1) {
+                try {
+                    activityObject = mapper.readValue((String) entry.getDocument(), ActivityObject.class);
+                } catch( Exception e2 ) {
+                    LOGGER.error("Can't handle input: ", e2);
+                }
+            }
+        }
+
+        Preconditions.checkArgument(activity != null || activityObject != null);
+
+        List<String> labels = Lists.newArrayList("streams");
+
+        if( activityObject != null ) {
+            if (activityObject.getObjectType() != null)
+                labels.add(activityObject.getObjectType());
+            Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activityObject);
+            graphutil.addNode(
+                    graph,
+                    labels,
+                    addNode);
+        } else if( activity != null ) {
+
+            // always add vertices first
+
+            if (activity.getProvider() != null &&
+                    !Strings.isNullOrEmpty(activity.getProvider().getId())) {
+                labels.add(activity.getProvider().getId());
+            }
+            if (activity.getActor() != null &&
+                    !Strings.isNullOrEmpty(activity.getActor().getId())) {
+                if (activity.getActor().getObjectType() != null)
+                    labels.add(activity.getActor().getObjectType());
+                Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activity.getActor());
+                graphutil.addNode(
+                        graph,
+                        labels,
+                        addNode);
+            }
+
+            if (activity.getObject() != null &&
+                    !Strings.isNullOrEmpty(activity.getObject().getId())) {
+                if (activity.getObject().getObjectType() != null)
+                    labels.add(activity.getObject().getObjectType());
+                Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activity.getObject());
+                graphutil.addNode(
+                        graph,
+                        labels,
+                        addNode);
+            }
+
+            // then add edge
+
+            if (!Strings.isNullOrEmpty(activity.getVerb())) {
+                if (activity.getVerb() != null)
+                    labels.add(activity.getVerb());
+                Quartet<String, String, String, Map<String, Object>> addRelationship = binaryGraphHelper.createEdgeRequest(activity);
+                graphutil.addRelationship(
+                        graph,
+                        labels,
+                        addRelationship);
+            }
+
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
new file mode 100644
index 0000000..086b443
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
@@ -0,0 +1,95 @@
+package org.apache.streams.graph.neo4j;
+
+import com.google.common.collect.Lists;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.neo4j.graphdb.Direction;
+import org.neo4j.graphdb.DynamicLabel;
+import org.neo4j.graphdb.DynamicRelationshipType;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Label;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Result;
+import org.neo4j.graphdb.Transaction;
+import org.neo4j.graphdb.schema.Schema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class Neo4jBinaryGraphUtil {
+
+    CypherQueryGraphHelper queryGraphHelper = new CypherQueryGraphHelper();
+
+    protected void addUniqueIndex(GraphDatabaseService graph, String label, String property, Boolean wait ) {
+        Transaction tx = graph.beginTx();
+        try {
+            Schema schema = graph.schema();
+            schema
+                .constraintFor(DynamicLabel.label(label))
+                .assertPropertyIsUnique(property)
+                .create();
+            if (wait)
+                schema.awaitIndexesOnline(2L, TimeUnit.HOURS);
+            tx.success();
+            tx.close();
+        } finally {
+            tx.failure();
+            tx.close();
+        }
+    }
+
+    protected void addIndex(GraphDatabaseService graph, String label, String property, Boolean wait) {
+        Transaction tx = graph.beginTx();
+        try {
+            Schema schema = graph.schema();
+            schema
+                .indexFor(DynamicLabel.label(label))
+                .on(property)
+                .create();
+            if (wait)
+                schema.awaitIndexesOnline(2L, TimeUnit.HOURS);
+            tx.success();
+            tx.close();
+        } finally {
+            tx.failure();
+            tx.close();
+        }
+    }
+
+
+    protected void addNode(GraphDatabaseService graph, List<String> labelStringList, Pair<String, Map<String, Object>> nodeIdPlusProperties) {
+        Transaction tx = graph.beginTx();
+        List<Label> labelList = Lists.newArrayList();
+        for( String labelString : labelStringList ) {
+            labelList.add(DynamicLabel.label(labelString));
+        }
+        try {
+            Node node = graph.createNode(labelList.toArray(new Label[0]));
+            node.setProperty("id", nodeIdPlusProperties.getValue0());
+            for( Map.Entry<String, Object> property : nodeIdPlusProperties.getValue1().entrySet()) {
+                node.setProperty(property.getKey(), property.getValue());
+            }
+            tx.success();
+        } catch(Exception e) {
+            tx.failure();
+        } finally {
+            tx.close();
+        }
+    }
+
+    protected void addRelationship(GraphDatabaseService graph, List<String> labelStringList, Quartet<String, String, String, Map<String, Object>> relationshipIdsPlusProperties) {
+        Transaction tx = graph.beginTx();
+        try {
+            Node source = graph.findNodesByLabelAndProperty(DynamicLabel.label(labelStringList.get(0)), "id", relationshipIdsPlusProperties.getValue0()).iterator().next();
+            Node destination = graph.findNodesByLabelAndProperty(DynamicLabel.label(labelStringList.get(0)), "id", relationshipIdsPlusProperties.getValue1()).iterator().next();
+            source.createRelationshipTo(destination, DynamicRelationshipType.withName(labelStringList.get(labelStringList.size() - 1)));
+            tx.success();
+        } catch(Exception e) {
+            tx.failure();
+        } finally {
+            tx.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
new file mode 100644
index 0000000..a076cff
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class Neo4jHttpGraphHelper implements HttpGraphHelper {
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public final static String statementKey = "statement";
+    public final static String paramsKey = "parameters";
+    public final static String propsKey = "props";
+
+    public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) {
+
+        Preconditions.checkNotNull(queryPlusParameters);
+        Preconditions.checkNotNull(queryPlusParameters.getValue0());
+        Preconditions.checkNotNull(queryPlusParameters.getValue1());
+
+        ObjectNode request = mapper.createObjectNode();
+
+        request.put(statementKey, queryPlusParameters.getValue0());
+
+        ObjectNode params = mapper.createObjectNode();
+        ObjectNode props = mapper.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
+
+        params.put(propsKey, props);
+        request.put(paramsKey, params);
+
+        return request;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
new file mode 100644
index 0000000..04a70e1
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
@@ -0,0 +1,28 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "type": {
+            "type": "string",
+            "description": "Graph DB type",
+            "enum" : ["neo4j", "gremlin"]
+        },
+        "file": {
+            "type": "string",
+            "description": "New Graph DB File"
+        },
+        "indexFields": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            },
+            "description": "Fields to index under streams label"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
deleted file mode 100644
index f14f52c..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "objects": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "verbs": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "objectTypes": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
new file mode 100644
index 0000000..c63e0fb
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
@@ -0,0 +1,22 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.graph.GraphHttpConfiguration",
+    "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "type": {
+            "type": "string",
+            "description": "Graph DB type",
+            "enum" : ["neo4j", "rexster"]
+        },
+        "graph": {
+            "type": "string",
+            "description": "Graph DB Graph ID"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
index ca588f3..6cffba0 100644
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
@@ -6,7 +6,7 @@
     ],
     "id": "#",
     "javaType" : "org.apache.streams.graph.GraphReaderConfiguration",
-    "extends" : {"$ref":"GraphConfiguration.json"},
+    "extends" : {"$ref":"GraphHttpConfiguration.json"},
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "query": {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
deleted file mode 100644
index 66fc236..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "objects": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "verbs": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "objectTypes": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
deleted file mode 100644
index 0c7e304..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphWriterConfiguration",
-    "extends" : {"$ref":"GraphConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "vertices": {
-            "type": "object",
-            "javaType": "org.apache.streams.graph.GraphVertexWriterConfiguration"
-        },
-        "edges": {
-            "type": "object",
-            "javaType": "org.apache.streams.graph.GraphEdgeWriterConfiguration"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
new file mode 100644
index 0000000..d5105ec
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.GraphHttpConfiguration;
+import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.graph.GraphVertexReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.test.TestGraphDatabaseFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Unit test for
+ * @see {@link GraphVertexReader}
+ *
+ * Test that graph db http writes to neo4j rest API
+ *
+ *
+ */
+@Ignore("Need to find a way to launch neo4j during verify step to use this")
+public class Neo4jHttpPersistWriterIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistWriterIT.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private GraphHttpConfiguration testConfiguration;
+
+    private GraphHttpPersistWriter graphPersistWriter;
+
+    @Before
+    public void prepareTest() throws IOException {
+
+        testConfiguration = new GraphHttpConfiguration();
+        testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
+        testConfiguration.setGraph("data");
+        testConfiguration.setHostname("localhost");
+        testConfiguration.setPort(7474l);
+        testConfiguration.setContentType("application/json");
+        testConfiguration.setProtocol("http");
+
+        graphPersistWriter = new GraphHttpPersistWriter(testConfiguration);
+
+        graphPersistWriter.prepare(testConfiguration);
+    }
+
+    @Test
+    public void testNeo4jHttpPersistWriter() throws IOException {
+
+        InputStream testActivityFolderStream = Neo4jHttpPersistWriterIT.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file );
+            InputStream testActivityFileStream = Neo4jHttpPersistWriterIT.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = mapper.readValue(testActivityFileStream, Activity.class);
+            activity.getActor().setId(activity.getActor().getObjectType());
+            activity.getObject().setId(activity.getObject().getObjectType());
+            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+            graphPersistWriter.write( datum );
+            LOGGER.info("Wrote: " + activity.getVerb() );
+        }
+
+        graphPersistWriter.cleanUp();
+
+        // hit neo with http and check vertex/edge counts
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
new file mode 100644
index 0000000..8295734
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.GraphBinaryConfiguration;
+import org.apache.streams.graph.GraphReaderConfiguration;
+import org.apache.streams.graph.GraphVertexReader;
+import org.apache.streams.graph.neo4j.Neo4jBinaryGraphPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.graphdb.Direction;
+import org.neo4j.graphdb.DynamicLabel;
+import org.neo4j.graphdb.DynamicRelationshipType;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Relationship;
+import org.neo4j.graphdb.RelationshipType;
+import org.neo4j.graphdb.factory.GraphDatabaseFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Unit test for
+ * @see {@link GraphVertexReader}
+ *
+ * Test that graph db responses can be converted to streams data
+ */
+public class TestNeo4jBinaryPersistWriter {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestNeo4jBinaryPersistWriter.class);
+
+    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private final String testGraphFile = "target/graph.neo4j";
+
+    private GraphBinaryConfiguration testConfiguration;
+
+    private Neo4jBinaryGraphPersistWriter graphPersistWriter;
+
+    @Before
+    public void prepareTest() throws IOException {
+
+        testConfiguration = new GraphBinaryConfiguration();
+        testConfiguration.setType(GraphBinaryConfiguration.Type.NEO_4_J);
+        testConfiguration.setFile(testGraphFile);
+
+        graphPersistWriter = new Neo4jBinaryGraphPersistWriter(testConfiguration);
+
+        graphPersistWriter.prepare(testConfiguration);
+
+        assert(graphPersistWriter.graph.isAvailable(5000));
+
+    }
+
+    @Test
+    public void testNeo4jBinaryPersistWriter() throws Exception {
+
+        InputStream testActivityFolderStream = TestNeo4jBinaryPersistWriter.class.getClassLoader()
+                .getResourceAsStream("activities");
+        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+        for( String file : files) {
+            LOGGER.info("File: " + file);
+            InputStream testActivityFileStream = TestNeo4jBinaryPersistWriter.class.getClassLoader()
+                    .getResourceAsStream("activities/" + file);
+            Activity activity = mapper.readValue(testActivityFileStream, Activity.class);
+            activity.getActor().setId(activity.getActor().getObjectType());
+            activity.getObject().setId(activity.getObject().getObjectType());
+            if( !Strings.isNullOrEmpty((String)activity.getObject().getAdditionalProperties().get("verb"))) {
+                activity.getObject().setObjectType((String) activity.getObject().getAdditionalProperties().get("verb"));
+                activity.getObject().setId(activity.getObject().getObjectType());
+            }
+            if( !Strings.isNullOrEmpty(activity.getActor().getId())) {
+                StreamsDatum actorDatum = new StreamsDatum(activity.getActor(), activity.getActor().getId());
+                graphPersistWriter.write( actorDatum );
+            }
+            if( !Strings.isNullOrEmpty(activity.getObject().getId())) {
+                StreamsDatum objectDatum = new StreamsDatum(activity.getObject(), activity.getObject().getId());
+                graphPersistWriter.write( objectDatum );
+            }
+            if( !Strings.isNullOrEmpty(activity.getVerb()) &&
+                !Strings.isNullOrEmpty(activity.getActor().getId()) &&
+                !Strings.isNullOrEmpty(activity.getObject().getId())) {
+                StreamsDatum activityDatum = new StreamsDatum(activity, activity.getVerb());
+                graphPersistWriter.write( activityDatum );
+            }
+            LOGGER.info("Wrote: " + activity.getVerb());
+        }
+
+        graphPersistWriter.cleanUp();
+
+        graphPersistWriter.graph.beginTx();
+        Node organization = graphPersistWriter.graph.findNodes(DynamicLabel.label("streams"), "id", "organization").next();
+        Node person = graphPersistWriter.graph.findNodes(DynamicLabel.label("streams"), "id", "person").next();
+        Assert.assertNotNull(organization);
+        Assert.assertTrue(organization.hasLabel(DynamicLabel.label("streams")));
+        Assert.assertTrue(organization.hasLabel(DynamicLabel.label("organization")));
+        Assert.assertNotNull(person);
+        Assert.assertTrue(person.hasLabel(DynamicLabel.label("streams")));
+        Assert.assertTrue(person.hasLabel(DynamicLabel.label("person")));
+        Assert.assertTrue(person.hasRelationship());
+        Assert.assertTrue(person.hasRelationship(Direction.OUTGOING));
+        Assert.assertTrue(person.hasRelationship(DynamicRelationshipType.withName("join"), Direction.OUTGOING));
+        Assert.assertTrue(person.hasRelationship(DynamicRelationshipType.withName("leave"), Direction.OUTGOING));
+//        Iterable < Relationship > relationships = person.getRelationships(Direction.OUTGOING);
+//        List<Relationship> relationshipList = Lists.newArrayList(relationships);
+//        Assert.assertEquals(relationshipList.size(), 2);
+        Relationship joinRelationship = person.getSingleRelationship(DynamicRelationshipType.withName("join"), Direction.OUTGOING);
+        Assert.assertNotNull(joinRelationship);
+        Node joinRelationshipStart = joinRelationship.getStartNode();
+        Node joinRelationshipEnd = joinRelationship.getEndNode();
+        Assert.assertEquals(joinRelationshipStart, person);
+        Assert.assertEquals(joinRelationshipEnd, organization);
+        Relationship leaveRelationship = person.getSingleRelationship(DynamicRelationshipType.withName("leave"), Direction.OUTGOING);
+        Assert.assertNotNull(leaveRelationship);
+        Node leaveRelationshipStart = leaveRelationship.getStartNode();
+        Node leaveRelationshipEnd = leaveRelationship.getEndNode();
+        Assert.assertEquals(leaveRelationshipStart, person);
+        Assert.assertEquals(leaveRelationshipEnd, organization);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
index ec2b1e5..8d76e90 100644
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.io.IOUtils;
-import org.apache.streams.graph.GraphConfiguration;
+import org.apache.streams.graph.GraphHttpConfiguration;
 import org.apache.streams.graph.GraphReaderConfiguration;
 import org.apache.streams.graph.GraphVertexReader;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -57,7 +57,7 @@ public class TestNeo4jVertexReader {
     public void prepareTest() throws IOException {
 
         testConfiguration = new GraphReaderConfiguration();
-        testConfiguration.setType(GraphConfiguration.Type.NEO_4_J);
+        testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
 
         graphPersistReader = new GraphVertexReader(testConfiguration);
         InputStream testActivityFileStream = TestNeo4jVertexReader.class.getClassLoader()