You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/08 18:36:29 UTC

[1/2] incubator-streams git commit: STREAMS:344: streams-persist-neo4j

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7810361d2 -> 4bd22317e


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
new file mode 100644
index 0000000..171dde4
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistReader.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Queues;
+
+import org.joda.time.DateTime;
+import org.neo4j.driver.internal.value.NodeValue;
+import org.neo4j.driver.internal.value.RelationshipValue;
+import org.neo4j.driver.internal.value.StringValue;
+import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.Transaction;
+import org.neo4j.driver.v1.Value;
+import org.neo4j.driver.v1.types.Node;
+import org.neo4j.driver.v1.types.Relationship;
+import org.neo4j.driver.v1.util.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Neo4jBoltPersistReader reads documents from neo4j.
+ */
+public class Neo4jBoltPersistReader implements StreamsPersistReader {
+
+  public static final String STREAMS_ID = "CassandraPersistReader";
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistReader.class);
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private ExecutorService executor;
+  private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>();
+
+  private Neo4jReaderConfiguration config;
+
+  protected Neo4jBoltClient client;
+
+//  protected Cluster cluster;
+//  protected Session session;
+//
+//  protected String keyspace;
+//  protected String table;
+  protected StatementResult statementResult;
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * Neo4jBoltPersistReader constructor - resolves Neo4jReaderConfiguration from JVM 'neo4j'.
+   */
+  public Neo4jBoltPersistReader() {
+    this.config = new ComponentConfigurator<>(Neo4jReaderConfiguration.class)
+      .detectConfiguration(StreamsConfigurator.getConfig().getConfig("neo4j"));
+  }
+
+  /**
+   * Neo4jBoltPersistReader constructor - uses supplied Neo4jReaderConfiguration.
+   * @param config config
+   */
+  public Neo4jBoltPersistReader(Neo4jReaderConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Neo4jBoltPersistReader constructor - uses supplied persistQueue.
+   * @param persistQueue persistQueue
+   */
+  public Neo4jBoltPersistReader(Queue<StreamsDatum> persistQueue) {
+    this();
+    this.persistQueue = persistQueue;
+  }
+
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
+
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
+
+  public void stop() {
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    if( configurationObject instanceof Neo4jReaderConfiguration ) {
+      this.config = (Neo4jReaderConfiguration) configurationObject;
+    }
+    this.client = Neo4jBoltClient.getInstance(this.config);
+
+    persistQueue = constructQueue();
+
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
+
+  protected Optional<StreamsDatum> buildDatum(Record record) {
+    ObjectNode objectNode;
+
+    if( record != null ) {
+      ObjectNode valueJson = null;
+      Map<String, ObjectNode> valueJsons = record.asMap(neo4jObjectNodeFunction);
+      if( valueJsons.size() == 1) {
+        valueJson = valueJsons.get(valueJsons.keySet().iterator().next());
+      }
+      objectNode = PropertyUtil.unflattenObjectNode(valueJson, '.');
+      return Optional.of(new StreamsDatum(objectNode));
+    }
+
+    return Optional.empty();
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+
+    Session session = null;
+
+    String query = config.getQuery();
+    Map<String, Object> params = mapper.convertValue(config.getParams(), Map.class);
+
+    try {
+      session = client.client().session();
+      Transaction transaction = session.beginTransaction();
+
+      this.statementResult = client.client().session().beginTransaction().run(query, params);
+
+      while( statementResult.hasNext()) {
+        Record record = statementResult.next();
+        Optional<StreamsDatum> datum = buildDatum(record);
+        if( datum.isPresent()) {
+          write(datum.get());
+        }
+      }
+
+    } catch(Exception ex) {
+      LOGGER.warn("Exception", ex);
+    } finally {
+      if( session != null ) {
+        session.close();
+      }
+    }
+    return readCurrent();
+
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.debug("startStream");
+    Neo4jBoltPersistReaderTask readerTask = new Neo4jBoltPersistReaderTask(this);
+
+    CompletableFuture.runAsync(readerTask, executor);
+
+    try {
+      if (readerTaskFuture.get()) {
+        executor.shutdown();
+      }
+    } catch (InterruptedException ex) {
+      LOGGER.trace("Interrupt", ex);
+    } catch (ExecutionException ex) {
+      LOGGER.trace("Execution exception", ex);
+    }
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    try {
+      lock.writeLock().lock();
+      current = new StreamsResultSet(persistQueue);
+      current.setCounter(new DatumStatusCounter());
+      persistQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    return current;
+  }
+
+  protected void write(StreamsDatum entry) {
+    boolean success;
+    do {
+      try {
+        lock.readLock().lock();
+        success = persistQueue.offer(entry);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
+    }
+    while (!success);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isTerminated() || !executor.isShutdown();
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+  }
+
+  private static String readAllStatement() {
+    return "MATCH (v:streams)";
+  }
+
+  public class Neo4jBoltPersistReaderTask implements Runnable {
+
+    private Neo4jBoltPersistReader reader;
+
+    public Neo4jBoltPersistReaderTask(Neo4jBoltPersistReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (reader.statementResult.hasNext()) {
+          Record record = statementResult.next();
+          Optional<StreamsDatum> datum = reader.buildDatum(record);
+          if( datum.isPresent() ) {
+            reader.write(datum.get());
+          }
+        }
+      } finally {
+        readerTaskFuture.complete(true);
+      }
+    }
+  }
+
+  Function<Value, ObjectNode> neo4jObjectNodeFunction = new Function<Value, ObjectNode>() {
+
+    @Nullable
+    @Override
+    public ObjectNode apply(@Nullable Value value) {
+      ObjectNode resultNode = null;
+      if (value instanceof StringValue) {
+        StringValue stringValue = (StringValue) value;
+        String string = stringValue.asLiteralString();
+        try {
+          resultNode = mapper.readValue(string, ObjectNode.class);
+        } catch (IOException ex) {
+          LOGGER.error("IOException", ex);
+        }
+      } else if ( value instanceof NodeValue) {
+        NodeValue nodeValue = (NodeValue) value;
+        Node node = nodeValue.asNode();
+        Map<String, Object> nodeMap = node.asMap();
+        resultNode = PropertyUtil.unflattenMap(nodeMap, '.');
+      } else if (value instanceof RelationshipValue) {
+        RelationshipValue relationshipValue = (RelationshipValue) value;
+        Relationship relationship = relationshipValue.asRelationship();
+        Map<String, Object> relationshipMap = relationship.asMap();
+        resultNode = PropertyUtil.unflattenMap(relationshipMap, '.');
+      }
+      return resultNode;
+    }
+  };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
new file mode 100644
index 0000000..3c752d6
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltPersistWriter.java
@@ -0,0 +1,77 @@
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jPersistUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.javatuples.Pair;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 12/16/16.
+ */
+public class Neo4jBoltPersistWriter implements StreamsPersistWriter {
+
+  private Neo4jConfiguration config;
+
+  Neo4jBoltClient client;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+  private static ObjectMapper mapper;
+
+  public Neo4jBoltPersistWriter(Neo4jConfiguration config) {
+    this.config = config;
+
+  }
+
+  @Override
+  public String getId() {
+    return Neo4jBoltPersistWriter.class.getSimpleName();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = Neo4jBoltClient.getInstance(config);
+  }
+
+  @Override
+  public void cleanUp() {
+    //
+  }
+
+  @Override
+  public void write(StreamsDatum entry) {
+
+    List<Pair<String, Map<String, Object>>> statements;
+    Session session = null;
+    try {
+      statements = Neo4jPersistUtil.prepareStatements(entry);
+      session = client.client().session();
+      Transaction transaction = session.beginTransaction();
+      for( Pair<String, Map<String, Object>> statement : statements ) {
+        StatementResult statementResult = transaction.run( statement.getValue0(), statement.getValue1() );
+        LOGGER.debug("StatementResult", statementResult.single());
+      }
+      transaction.success();
+    } catch( Exception ex ) {
+      LOGGER.error("Exception", ex);
+    } finally {
+      if( session != null ) {
+        session.close();
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
new file mode 100644
index 0000000..da8c01e
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java
@@ -0,0 +1,74 @@
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.neo4j.Neo4jConfiguration;
+
+import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Neo4jHttpClient {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(Neo4jHttpClient.class);
+
+    public Neo4jConfiguration config;
+
+    private HttpClient client;
+
+    private Neo4jHttpClient(Neo4jConfiguration neo4jConfiguration) {
+        this.config = neo4jConfiguration;
+        try {
+            this.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.client = null;
+        }
+    }
+
+    private static Map<Neo4jConfiguration, Neo4jHttpClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jHttpClient>();
+
+    public static Neo4jHttpClient getInstance(Neo4jConfiguration neo4jConfiguration) {
+        if ( INSTANCE_MAP != null &&
+             INSTANCE_MAP.size() > 0 &&
+             INSTANCE_MAP.containsKey(neo4jConfiguration)) {
+            return INSTANCE_MAP.get(neo4jConfiguration);
+        } else {
+            Neo4jHttpClient instance = new Neo4jHttpClient(neo4jConfiguration);
+            if( instance != null && instance.client != null ) {
+                INSTANCE_MAP.put(neo4jConfiguration, instance);
+                return instance;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public void start() throws Exception {
+
+        Objects.nonNull(config);
+        assertThat("config.getScheme().startsWith(\"http\")", config.getScheme().startsWith("http"));
+
+        LOGGER.info("Neo4jConfiguration.start {}", config);
+
+        Objects.nonNull(client);
+
+    }
+
+    public void stop() throws Exception {
+        this.client = null;
+    }
+
+    public Neo4jConfiguration config() {
+        return config;
+    }
+
+    public HttpClient client() {
+        return client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
new file mode 100644
index 0000000..4c126b8
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpGraphHelper.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Supporting class for interacting with neo4j via rest API.
+ */
+public class Neo4jHttpGraphHelper implements HttpGraphHelper {
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
+
+  private static final String statementKey = "statement";
+  private static final String queryKey = "query";
+  private static final String paramsKey = "parameters";
+  private static final String propsKey = "props";
+
+  /**
+   * readDataStatement neo4j rest json read data payload.
+   *
+   * @param queryPlusParameters (query, parameter map)
+   * @return ObjectNode
+   */
+  public ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters) {
+
+    LOGGER.debug("readData: ", queryPlusParameters);
+
+    Objects.requireNonNull(queryPlusParameters);
+    Objects.requireNonNull(queryPlusParameters.getValue0());
+
+    ObjectNode request = MAPPER.createObjectNode();
+
+    request.put(queryKey, queryPlusParameters.getValue0());
+
+    if( queryPlusParameters.getValue1() != null && queryPlusParameters.getValue1().size() > 0) {
+      ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
+      request.put(paramsKey, params);
+    }
+
+    LOGGER.debug("readData: ", request);
+
+    return request;
+  }
+
+  /**
+   * writeDataStatement neo4j rest json write data payload.
+   *
+   * @param queryPlusParameters (query, parameter map)
+   * @return ObjectNode
+   */
+  public ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters) {
+
+    LOGGER.debug("writeData: ", queryPlusParameters);
+
+    Objects.requireNonNull(queryPlusParameters);
+    Objects.requireNonNull(queryPlusParameters.getValue0());
+
+    ObjectNode request = MAPPER.createObjectNode();
+
+    request.put(statementKey, queryPlusParameters.getValue0());
+
+    if( queryPlusParameters.getValue1() != null && queryPlusParameters.getValue1().size() > 0) {
+      ObjectNode params = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
+      request.put(paramsKey, params);
+    } else {
+      request.put(paramsKey, MAPPER.createObjectNode());
+    }
+
+    LOGGER.debug("writeData: ", request);
+
+    return request;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
new file mode 100644
index 0000000..86a9da2
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.components.http.HttpConfiguration;
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.components.http.provider.SimpleHttpProvider;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.CypherQueryResponse;
+import org.apache.streams.neo4j.ItemData;
+import org.apache.streams.neo4j.ItemMetadata;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Reads a stream of activityobjects from vertices in a graph database with
+ * an http rest endpoint (such as neo4j).
+ */
+public class Neo4jHttpPersistReader extends SimpleHttpProvider implements StreamsPersistReader {
+
+  public static final String STREAMS_ID = Neo4jHttpPersistReader.class.getCanonicalName();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistReader.class);
+
+  private Neo4jReaderConfiguration config;
+
+  private QueryGraphHelper queryGraphHelper;
+  private HttpGraphHelper httpGraphHelper;
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'.
+   */
+  public Neo4jHttpPersistReader() {
+    this(new ComponentConfigurator<>(Neo4jReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j")));
+  }
+
+  /**
+   * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
+   * @param configuration GraphReaderConfiguration
+   */
+  public Neo4jHttpPersistReader(Neo4jReaderConfiguration configuration) {
+    super((HttpProviderConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration, HttpProviderConfiguration.class).withHostname(configuration.getHosts().get(0)));
+    super.configuration.setRequestMethod(HttpConfiguration.RequestMethod.POST);
+    super.configuration.setResourcePath("/db");
+    super.configuration.setResource("data");
+    super.configuration.setResourcePostfix("cypher");
+    this.config = configuration;
+  }
+
+  /**
+   * prepareHttpRequest
+   * @param uri uri
+   * @return result
+   */
+  public HttpRequestBase prepareHttpRequest(URI uri) {
+    HttpRequestBase baseRequest = super.prepareHttpRequest(uri);
+    HttpPost post = (HttpPost) baseRequest;
+    String query = config.getQuery();
+    Map<String, Object> params = mapper.convertValue(config.getParams(), Map.class);
+    Pair<String, Map<String, Object>> queryPlusParams = new Pair(query, params);
+    ObjectNode queryNode = httpGraphHelper.readData(queryPlusParams);
+    try {
+      String queryJsonString = mapper.writeValueAsString(queryNode);
+      HttpEntity entity = new StringEntity(queryJsonString, ContentType.create("application/json"));
+      post.setEntity(entity);
+    } catch (JsonProcessingException ex) {
+      LOGGER.error("JsonProcessingException", ex);
+      return null;
+    }
+    return post;
+
+  }
+  /**
+   * Neo API query returns something like this:
+   * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] }
+   *
+   * @param jsonNode jsonNode
+   * @return result
+   */
+  public List<ObjectNode> parse(JsonNode jsonNode) {
+    List<ObjectNode> results = new ArrayList<>();
+
+    ObjectNode root = (ObjectNode) jsonNode;
+
+    CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class);
+
+    for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) {
+
+      for (List<ItemMetadata> itemMetadatas : dataWrapper) {
+
+        for (ItemMetadata itemMetadata : itemMetadatas) {
+
+          ItemData itemData = itemMetadata.getData();
+
+          LOGGER.debug("itemData: " + itemData);
+
+          results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
+        }
+
+      }
+
+    }
+    return results;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    super.prepare(configurationObject);
+    mapper = StreamsJacksonMapper.getInstance();
+
+    queryGraphHelper = new CypherQueryGraphHelper();
+    httpGraphHelper = new Neo4jHttpGraphHelper();
+
+    Objects.requireNonNull(queryGraphHelper);
+    Objects.requireNonNull(httpGraphHelper);
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
new file mode 100644
index 0000000..e05a252
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpPersistWriter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.http;
+
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jPersistUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database with
+ * an http rest endpoint (such as neo4j).
+ */
+public class Neo4jHttpPersistWriter extends SimpleHTTPPostPersistWriter {
+
+  public static final String STREAMS_ID = Neo4jHttpPersistWriter.class.getCanonicalName();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistWriter.class);
+  private static final long MAX_WRITE_LATENCY = 1000;
+
+  private Neo4jConfiguration configuration;
+
+  private QueryGraphHelper queryGraphHelper;
+  private HttpGraphHelper httpGraphHelper;
+
+  private static ObjectMapper mapper;
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  /**
+   * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'.
+   */
+  public Neo4jHttpPersistWriter() {
+    this(new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("neo4j")));
+  }
+
+  /**
+   * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
+   * @param configuration GraphHttpConfiguration
+   */
+  public Neo4jHttpPersistWriter(Neo4jConfiguration configuration) {
+    super((HttpPersistWriterConfiguration)StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class).withHostname(configuration.getHosts().get(0)));
+    super.configuration.setResourcePath("/db/data/transaction/commit/");
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
+
+    List<Pair<String, Map<String, Object>>> statements = Neo4jPersistUtil.prepareStatements(entry);
+
+    ObjectNode requestNode = mapper.createObjectNode();
+    ArrayNode statementsArray = mapper.createArrayNode();
+
+    for( Pair<String, Map<String, Object>> statement : statements ) {
+      statementsArray.add(httpGraphHelper.writeData(statement));
+    }
+
+    requestNode.put("statements", statementsArray);
+    return requestNode;
+
+  }
+
+  @Override
+  protected ObjectNode executePost(HttpPost httpPost) {
+
+    Objects.requireNonNull(httpPost);
+
+    ObjectNode result = null;
+
+    CloseableHttpResponse response = null;
+
+    String entityString = null;
+    try {
+      response = httpclient.execute(httpPost);
+      HttpEntity entity = response.getEntity();
+      if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+        result = mapper.readValue(entityString, ObjectNode.class);
+      }
+      LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+      if ( result == null
+           || (
+              result.get("errors") != null
+                  && result.get("errors").isArray()
+                  && result.get("errors").iterator().hasNext()
+              )
+          ) {
+        LOGGER.error("Write Error: " + result.get("errors"));
+      } else {
+        LOGGER.debug("Write Success");
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
+    } catch (Exception ex) {
+      LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
+    } finally {
+      try {
+        if ( response != null) {
+          response.close();
+        }
+      } catch (IOException ignored) {
+        LOGGER.trace("ignored IOException", ignored);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    super.prepare(null);
+    mapper = StreamsJacksonMapper.getInstance();
+
+    queryGraphHelper = new CypherQueryGraphHelper();
+    httpGraphHelper = new Neo4jHttpGraphHelper();
+
+    Objects.requireNonNull(queryGraphHelper);
+    Objects.requireNonNull(httpGraphHelper);
+  }
+
+  @Override
+  public void cleanUp() {
+
+    LOGGER.info("exiting");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
new file mode 100644
index 0000000..4e80eb4
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/CypherQueryResponse.json
@@ -0,0 +1,43 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "javaType" : "org.apache.streams.neo4j.CypherQueryResponse",
+    "properties": {
+        "columns": {
+            "type": "array",
+            "id": "http://jsonschema.net/columns",
+            "required": false,
+            "items": {
+                "type": "string",
+                "id": "http://jsonschema.net/columns/0",
+                "required": false
+            }
+        },
+        "data": {
+            "type": "array",
+            "required": false,
+            "items": {
+                "type": "array",
+                "required": false,
+                "items": {
+                    "type": "array",
+                    "required": false,
+                    "items": {
+                        "type": "object",
+                        "javaType" : "org.apache.streams.neo4j.ItemMetadata",
+                        "properties": {
+                            "data": {
+                                "type": "object",
+                                "javaType" : "org.apache.streams.neo4j.ItemData"
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
new file mode 100644
index 0000000..abd2391
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jConfiguration.json
@@ -0,0 +1,27 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.neo4j.Neo4jConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "scheme": {
+      "type": "string"
+    },
+    "hosts": {
+      "type": "array",
+      "items": {
+        "type": "string"
+      }
+    },
+    "port": {
+      "type": "integer"
+    },
+    "username": {
+      "type": "string"
+    },
+    "password": {
+      "type": "string"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
new file mode 100644
index 0000000..62c348f
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/jsonschema/org/apache/streams/neo4j/Neo4jReaderConfiguration.json
@@ -0,0 +1,17 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.neo4j.Neo4jReaderConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "extends" : {"$ref":"Neo4jConfiguration.json"},
+  "properties": {
+    "query": {
+      "type": "string",
+      "required": "true"
+    },
+    "params": {
+      "type": "object"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
new file mode 100644
index 0000000..c45d975
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jBoltPersistIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.neo4j.bolt.Neo4jBoltClient;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistReader;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for Neo4jBoltPersist.
+ *
+ * Test that graph db responses can be converted to streams data.
+ */
+public class Neo4jBoltPersistIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistIT.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private Neo4jBoltClient testClient;
+
+  private Neo4jConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/Neo4jBoltPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, "neo4j");
+    testClient = Neo4jBoltClient.getInstance(testConfiguration);
+
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
+  }
+
+  @Test
+  public void testNeo4jBoltPersist() throws Exception {
+
+    Neo4jBoltPersistWriter testPersistWriter = new Neo4jBoltPersistWriter(testConfiguration);
+    testPersistWriter.prepare(testConfiguration);
+
+    InputStream testActivityFolderStream = Neo4jBoltPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = Neo4jBoltPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+      if( activity.getActor() != null && activity.getActor().getId() == null && activity.getActor().getObjectType() != null) {
+        activity.getActor().setId(activity.getActor().getObjectType());
+      }
+      if( activity.getObject() != null && activity.getObject().getId() == null && activity.getObject().getObjectType() != null) {
+        activity.getObject().setId(activity.getObject().getObjectType());
+      }
+      if( activity.getTarget() != null && activity.getTarget().getId() == null && activity.getTarget().getObjectType() != null) {
+        activity.getTarget().setId(activity.getTarget().getObjectType());
+      }
+      if( activity.getId() == null && activity.getVerb() != null) {
+        activity.setId(activity.getVerb());
+      }
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    Neo4jReaderConfiguration vertexReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    vertexReaderConfiguration.setQuery("MATCH (v) return v");
+    Neo4jBoltPersistReader vertexReader = new Neo4jBoltPersistReader(vertexReaderConfiguration);
+    vertexReader.prepare(null);
+    StreamsResultSet vertexResultSet = vertexReader.readAll();
+    LOGGER.info("Total Read: {}", vertexResultSet.size() );
+    Assert.assertEquals(vertexResultSet.size(), 24);
+
+    Neo4jReaderConfiguration edgeReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r");
+    Neo4jBoltPersistReader edgeReader = new Neo4jBoltPersistReader(edgeReaderConfiguration);
+    edgeReader.prepare(null);
+    StreamsResultSet edgeResultSet = edgeReader.readAll();
+    LOGGER.info("Total Read: {}", edgeResultSet.size() );
+    Assert.assertEquals(edgeResultSet.size(), 100);
+
+  }
+
+  @AfterClass
+  public void cleanup() throws Exception {
+    Session session = testClient.client().session();
+    Transaction transaction = session.beginTransaction();
+    transaction.run("MATCH ()-[r]-() DELETE r");
+    transaction.run("MATCH (n) DETACH DELETE n");
+    transaction.success();
+    session.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
new file mode 100644
index 0000000..a5b0d30
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/Neo4jHttpPersistIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.Neo4jConfiguration;
+import org.apache.streams.neo4j.Neo4jReaderConfiguration;
+import org.apache.streams.neo4j.http.Neo4jHttpClient;
+import org.apache.streams.neo4j.http.Neo4jHttpPersistReader;
+import org.apache.streams.neo4j.http.Neo4jHttpPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for Neo4jHttpPersist.
+ *
+ * Test that graph db responses can be converted to streams data.
+ */
+public class Neo4jHttpPersistIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistIT.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private Neo4jHttpClient testClient;
+
+  private Neo4jConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/Neo4jHttpPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(Neo4jConfiguration.class).detectConfiguration(typesafe, "neo4j");
+
+  }
+
+  @Test
+  public void testNeo4jHttpPersist() throws Exception {
+
+    Neo4jHttpPersistWriter testPersistWriter = new Neo4jHttpPersistWriter(testConfiguration);
+    testPersistWriter.prepare(null);
+
+    InputStream testActivityFolderStream = Neo4jHttpPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+    // write data
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = Neo4jHttpPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+      if( activity.getActor() != null && activity.getActor().getId() == null && activity.getActor().getObjectType() != null) {
+        activity.getActor().setId(activity.getActor().getObjectType());
+      }
+      if( activity.getObject() != null && activity.getObject().getId() == null && activity.getObject().getObjectType() != null) {
+        activity.getObject().setId(activity.getObject().getObjectType());
+      }
+      if( activity.getTarget() != null && activity.getTarget().getId() == null && activity.getTarget().getObjectType() != null) {
+        activity.getTarget().setId(activity.getTarget().getObjectType());
+      }
+      if( activity.getId() == null && activity.getVerb() != null) {
+        activity.setId(activity.getVerb());
+      }
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    Neo4jReaderConfiguration vertexReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    vertexReaderConfiguration.setQuery("MATCH (v) return v");
+    Neo4jHttpPersistReader vertexReader = new Neo4jHttpPersistReader(vertexReaderConfiguration);
+    vertexReader.prepare(null);
+    StreamsResultSet vertexResultSet = vertexReader.readAll();
+    LOGGER.info("Total Read: {}", vertexResultSet.size() );
+    Assert.assertEquals(vertexResultSet.size(), 24);
+
+    Neo4jReaderConfiguration edgeReaderConfiguration= MAPPER.convertValue(testConfiguration, Neo4jReaderConfiguration.class);
+    edgeReaderConfiguration.setQuery("MATCH (s)-[r]->(d) return r");
+    Neo4jHttpPersistReader edgeReader = new Neo4jHttpPersistReader(edgeReaderConfiguration);
+    edgeReader.prepare(null);
+    StreamsResultSet edgeResultSet = edgeReader.readAll();
+    LOGGER.info("Total Read: {}", edgeResultSet.size() );
+    Assert.assertEquals(edgeResultSet.size(), 100);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
new file mode 100644
index 0000000..12f3306
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/test/java/org/apache/streams/neo4j/test/TestCypherQueryGraphHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j.test;
+
+import org.apache.streams.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import org.javatuples.Pair;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * TestCypherQueryGraphHelper tests
+ * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
+ */
+public class TestCypherQueryGraphHelper {
+
+  CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+
+  @Test
+  public void getVertexRequestIdTest() throws Exception {
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id");
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+
+  }
+
+  @Test
+  public void getVertexRequestLongTest() throws Exception {
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1));
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+
+  }
+
+  @Test
+  public void createVertexRequestTest() throws Exception {
+
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void mergeVertexRequestTest() throws Exception {
+
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void createActorObjectEdgeRequestTest() throws Exception {
+
+    ActivityObject actor = new ActivityObject();
+    actor.setId("actor");
+    actor.setObjectType("type");
+    actor.setContent("content");
+
+    ActivityObject object = new ActivityObject();
+    object.setId("object");
+    object.setObjectType("type");
+    object.setContent("content");
+
+    Activity activity = new Activity();
+    activity.setId("activity");
+    activity.setVerb("verb");
+    activity.setContent("content");
+
+    activity.setActor(actor);
+    activity.setObject(object);
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.createActorObjectEdge(activity);
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+
+  @Test
+  public void createEdgeRequestTest() throws Exception {
+
+    ActivityObject actor = new ActivityObject();
+    actor.setId("actor");
+    actor.setObjectType("type");
+    actor.setContent("content");
+
+    ActivityObject object = new ActivityObject();
+    object.setId("object");
+    object.setObjectType("type");
+    object.setContent("content");
+
+    ActivityObject target = new ActivityObject();
+    object.setId("target");
+    object.setObjectType("type");
+
+    Activity activity = new Activity();
+    activity.setId("activity");
+    activity.setVerb("verb");
+    activity.setContent("content");
+
+    activity.setActor(actor);
+    activity.setObject(object);
+    activity.setObject(target);
+
+    Pair<String, Map<String, Object>> queryAndParams = helper.createActorTargetEdge(activity);
+
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
new file mode 100644
index 0000000..1d5ec35
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jBoltPersistIT.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+neo4j {
+  scheme = "tcp"
+  hosts += ${neo4j.tcp.host}
+  port = ${neo4j.tcp.port}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
new file mode 100644
index 0000000..929b3ed
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/test/resources/Neo4jHttpPersistIT.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+neo4j {
+  scheme = "http"
+  hosts += ${neo4j.http.host}
+  port = ${neo4j.http.port}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
deleted file mode 100644
index 981de44..0000000
--- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.ValueNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- *  Class transforms nested properties of activities, actors, objects, etc...
- */
-public class PropertyUtil {
-
-  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  public static Map<String, Object> flattenToMap(ObjectNode object) {
-    Map<String, Object> flatObject = new HashMap<>();
-    addKeys("", object, flatObject, '.');
-    return flatObject;
-  }
-
-  public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) {
-    Map<String, Object> flatObject = new HashMap<>();
-    addKeys("", object, flatObject, seperator);
-    return flatObject;
-  }
-
-  public static ObjectNode flattenToObjectNode(ObjectNode object) {
-    Map<String, Object> flatObject = flattenToMap(object, '.');
-    addKeys("", object, flatObject, '.');
-    return mapper.convertValue(flatObject, ObjectNode.class);
-  }
-
-  public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) {
-    Map<String, Object> flatObject = flattenToMap(object, seperator);
-    addKeys("", object, flatObject, seperator);
-    return mapper.convertValue(flatObject, ObjectNode.class);
-  }
-
-  private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) {
-    if (jsonNode.isObject()) {
-      ObjectNode objectNode = (ObjectNode) jsonNode;
-      Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
-      String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
-
-      while (iter.hasNext()) {
-        Map.Entry<String, JsonNode> entry = iter.next();
-        addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
-      }
-    } else if (jsonNode.isArray()) {
-      ArrayNode arrayNode = (ArrayNode) jsonNode;
-      map.put(currentPath, arrayNode);
-    } else if (jsonNode.isValueNode()) {
-      ValueNode valueNode = (ValueNode) jsonNode;
-      if ( valueNode.isTextual() ) {
-        map.put(currentPath, valueNode.asText());
-      } else if ( valueNode.isNumber() ) {
-        map.put(currentPath, valueNode);
-      }
-    }
-  }
-
-  public static ObjectNode unflattenMap(Map<String, Object> object, char seperator) {
-    return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), seperator);
-  }
-
-  public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperator) {
-    ObjectNode root = mapper.createObjectNode();
-    Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields();
-    while (iter.hasNext()) {
-      Map.Entry<String, JsonNode> item = iter.next();
-      String fullKey = item.getKey();
-      if ( !fullKey.contains(Character.valueOf(seperator).toString())) {
-        root.put(item.getKey(), item.getValue());
-      } else {
-        ObjectNode currentNode = root;
-        List<String> keyParts = new ArrayList<>();
-        Iterables.addAll(keyParts, Splitter.on(seperator).split(item.getKey()));
-        for (String part : Iterables.limit(Splitter.on(seperator).split(item.getKey()), keyParts.size() - 1)) {
-          if (currentNode.has(part) && currentNode.get(part).isObject()) {
-            currentNode = (ObjectNode) currentNode.get(part);
-          } else {
-            ObjectNode newNode = mapper.createObjectNode();
-            currentNode.put(part, newNode);
-            currentNode = newNode;
-          }
-        }
-        currentNode.put(keyParts.get(keyParts.size() - 1), item.getValue());
-
-      }
-    }
-    return root;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
new file mode 100644
index 0000000..1671174
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ValueNode;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  Class transforms nested properties of activities, actors, objects, etc...
+ */
+public class PropertyUtil {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  public static Map<String, Object> flattenToMap(ObjectNode object) {
+    Map<String, Object> flatObject = new HashMap<>();
+    addKeys(new String(), object, flatObject, '.');
+    return flatObject;
+  }
+
+  public static ObjectNode flattenToObjectNode(ObjectNode object) {
+    Map<String, Object> flatObject = flattenToMap(object, '.');
+    addKeys(new String(), object, flatObject, '.');
+    return mapper.convertValue(flatObject, ObjectNode.class);
+  }
+
+  public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) {
+    Map<String, Object> flatObject = new HashMap<>();
+    addKeys(new String(), object, flatObject, seperator);
+    return flatObject;
+  }
+
+  public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) {
+    Map<String, Object> flatObject = flattenToMap(object, seperator);
+    addKeys(new String(), object, flatObject, seperator);
+    return mapper.convertValue(flatObject, ObjectNode.class);
+  }
+
+  private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) {
+    if (jsonNode.isObject()) {
+      ObjectNode objectNode = (ObjectNode) jsonNode;
+      Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
+      String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator;
+
+      while (iter.hasNext()) {
+        Map.Entry<String, JsonNode> entry = iter.next();
+        addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator);
+      }
+    } else if (jsonNode.isArray()) {
+      ArrayNode arrayNode = (ArrayNode) jsonNode;
+      if( arrayNode.isTextual()) {
+        List<String> list = mapper.convertValue(arrayNode, List.class);
+        map.put(currentPath, list);
+      }
+      if( arrayNode.isNumber()) {
+        List<String> list = mapper.convertValue(arrayNode, List.class);
+        map.put(currentPath, list);
+      }
+    } else if (jsonNode.isValueNode()) {
+      ValueNode valueNode = (ValueNode) jsonNode;
+      if( valueNode.isTextual() )
+        map.put(currentPath, valueNode.asText());
+      else if ( valueNode.isNumber() )
+        map.put(currentPath, valueNode);
+    }
+  }
+
+  public static ObjectNode unflattenMap(Map<String, Object> object, char seperator) {
+    return unflattenObjectNode(mapper.convertValue(object, ObjectNode.class), seperator);
+  }
+
+  public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperator) {
+    ObjectNode root = mapper.createObjectNode();
+    Iterator<Map.Entry<String, JsonNode>> iter = flatObject.fields();
+    while (iter.hasNext()) {
+      Map.Entry<String, JsonNode> item = iter.next();
+      String fullKey = item.getKey();
+      if( !fullKey.contains(Character.valueOf(seperator).toString())) {
+        root.put(item.getKey(), item.getValue());
+      } else {
+        ObjectNode currentNode = root;
+        List<String> keyParts = new ArrayList<>(Arrays.asList(StringUtils.split(item.getKey(), seperator)));
+        keyParts.remove(keyParts.size()-1);
+        Iterator<String> keyPartIterator = keyParts.iterator();
+        while( keyPartIterator.hasNext()) {
+          String part = keyPartIterator.next();
+          if( currentNode.has(part) && currentNode.get(part).isObject() ) {
+            currentNode = (ObjectNode) currentNode.get(part);
+          } else {
+            ObjectNode newNode = mapper.createObjectNode();
+            currentNode.put(part, newNode);
+            currentNode = newNode;
+          }
+        };
+        currentNode.put(keyParts.get(keyParts.size()-1), item.getValue());
+
+      }
+    }
+    return root;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
new file mode 100644
index 0000000..233a431
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/schema/test/PropertyUtilTest.java
@@ -0,0 +1,25 @@
+package org.apache.streams.util.schema.test;
+
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.testng.annotations.Test;
+
+/**
+ * Created by sblackmon on 1/8/17.
+ */
+public class PropertyUtilTest {
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  String flatJson = "{\"a.a\": \"aa\", \"a.b\": \"ab\", \"b.a\": \"ba\", \"b.b\": \"bb\"}";
+
+  @Test
+  public void testUnflattenObjectNode() throws Exception {
+    ObjectNode flatNode = mapper.readValue(flatJson, ObjectNode.class);
+    ObjectNode unflattenedNode = PropertyUtil.unflattenObjectNode(flatNode, '.');
+    assert(unflattenedNode.size() == 2);
+  }
+}
+


[2/2] incubator-streams git commit: STREAMS:344: streams-persist-neo4j

Posted by sb...@apache.org.
STREAMS:344: streams-persist-neo4j

Squashed commit of the following:

commit 76207b1577a0fb6f05992c8700151223db20e4b3
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Jan 8 12:29:32 2017 -0600

    STREAMS-344: address PR feedback

    from https://github.com/apache/incubator-streams/pull/348

commit ee700fd16e8631bdb0fb453d686beef4167af13b
Author: Steve Blackmon <sb...@apache.org>
Date:   Mon Jan 2 19:42:33 2017 -0600

    add constructor

commit 1f4e175cf84a208252d488c2858ea420af0642f9
Author: Steve Blackmon <sb...@apache.org>
Date:   Mon Jan 2 18:11:01 2017 -0600

    new neo4j module with bolt:// and http:// support, and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bd22317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bd22317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bd22317

Branch: refs/heads/master
Commit: 4bd22317ea3a67b7dfdc0c9d3aba96a71f712e3a
Parents: 7810361
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sun Jan 8 12:36:18 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Jan 8 12:36:18 2017 -0600

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   3 +-
 streams-contrib/streams-persist-graph/pom.xml   |  20 +-
 .../streams/graph/GraphHttpPersistWriter.java   | 250 --------------
 .../apache/streams/graph/GraphVertexReader.java | 126 -------
 .../apache/streams/graph/HttpGraphHelper.java   |   4 +-
 .../apache/streams/graph/QueryGraphHelper.java  |   4 +-
 .../graph/neo4j/CypherQueryGraphHelper.java     | 238 -------------
 .../graph/neo4j/Neo4jHttpGraphHelper.java       |  75 ----
 .../streams/graph/GraphBinaryConfiguration.json |  28 --
 .../streams/graph/GraphConfiguration.json       |  22 --
 .../streams/graph/GraphHttpConfiguration.json   |  22 --
 .../graph/neo4j/CypherQueryResponse.json        |  43 ---
 .../graph/test/TestCypherQueryGraphHelper.java  | 116 -------
 .../graph/test/TestNeo4jHttpVertexReader.java   |  81 -----
 streams-contrib/streams-persist-neo4j/pom.xml   | 263 ++++++++++++++
 .../streams/neo4j/CypherQueryGraphHelper.java   | 344 +++++++++++++++++++
 .../apache/streams/neo4j/Neo4jPersistUtil.java  | 151 ++++++++
 .../streams/neo4j/bolt/Neo4jBoltClient.java     |  92 +++++
 .../neo4j/bolt/Neo4jBoltPersistReader.java      | 326 ++++++++++++++++++
 .../neo4j/bolt/Neo4jBoltPersistWriter.java      |  77 +++++
 .../streams/neo4j/http/Neo4jHttpClient.java     |  74 ++++
 .../neo4j/http/Neo4jHttpGraphHelper.java        | 104 ++++++
 .../neo4j/http/Neo4jHttpPersistReader.java      | 173 ++++++++++
 .../neo4j/http/Neo4jHttpPersistWriter.java      | 171 +++++++++
 .../streams/neo4j/CypherQueryResponse.json      |  43 +++
 .../streams/neo4j/Neo4jConfiguration.json       |  27 ++
 .../streams/neo4j/Neo4jReaderConfiguration.json |  17 +
 .../streams/neo4j/test/Neo4jBoltPersistIT.java  | 156 +++++++++
 .../streams/neo4j/test/Neo4jHttpPersistIT.java  | 138 ++++++++
 .../neo4j/test/TestCypherQueryGraphHelper.java  | 150 ++++++++
 .../src/test/resources/Neo4jBoltPersistIT.conf  |  20 ++
 .../src/test/resources/Neo4jHttpPersistIT.conf  |  20 ++
 .../apache/streams/data/util/PropertyUtil.java  | 124 -------
 .../org/apache/streams/util/PropertyUtil.java   | 130 +++++++
 .../util/schema/test/PropertyUtilTest.java      |  25 ++
 35 files changed, 2510 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 8408cef..aed60c9 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -37,8 +37,8 @@
     </properties>
 
     <modules>
-        <module>streams-persist-console</module>
         <module>streams-persist-cassandra</module>
+        <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-filebuffer</module>
         <module>streams-persist-hbase</module>
@@ -46,6 +46,7 @@
         <module>streams-persist-graph</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+        <module>streams-persist-neo4j</module>
         <module>streams-amazon-aws</module>
         <module>streams-processor-jackson</module>
         <module>streams-processor-json</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
index b8db538..996c706 100644
--- a/streams-contrib/streams-persist-graph/pom.xml
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -147,25 +147,7 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <includes>**/*.json</includes>
-                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                    <includeGroupIds>org.apache.streams</includeGroupIds>
-                    <includeTypes>test-jar</includeTypes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test-resource-dependencies</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>unpack-dependencies</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
+
             <!-- revisit using this if streams bumps to jdk8 -->
             <!--<plugin>-->
                 <!--<groupId>com.github.harti2006</groupId>-->

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
deleted file mode 100644
index 5b2dec6..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Adds activityobjects as vertices and activities as edges to a graph database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
-
-  public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName();
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class);
-  private static final long MAX_WRITE_LATENCY = 1000;
-
-  private GraphHttpConfiguration configuration;
-
-  private QueryGraphHelper queryGraphHelper;
-  private HttpGraphHelper httpGraphHelper;
-
-  private static ObjectMapper mapper;
-
-  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-  /**
-   * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'.
-   */
-  public GraphHttpPersistWriter() {
-    this(new ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-  }
-
-  /**
-   * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
-   * @param configuration GraphHttpConfiguration
-   */
-  public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
-    super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/");
-    } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
-      super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
-    }
-    this.configuration = configuration;
-  }
-
-  @Override
-  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
-    Activity activity = null;
-    ActivityObject activityObject;
-    Object document = entry.getDocument();
-
-    if (document instanceof Activity) {
-      activity = (Activity) document;
-      activityObject = activity.getObject();
-    } else if (document instanceof ActivityObject) {
-      activityObject = (ActivityObject) document;
-    } else {
-      ObjectNode objectNode;
-      if (document instanceof ObjectNode) {
-        objectNode = (ObjectNode) document;
-      } else if ( document instanceof String) {
-        try {
-          objectNode = mapper.readValue((String) document, ObjectNode.class);
-        } catch (IOException ex) {
-          LOGGER.error("Can't handle input: ", entry);
-          throw ex;
-        }
-      } else {
-        LOGGER.error("Can't handle input: ", entry);
-        throw new Exception("Can't create payload from datum.");
-      }
-
-      if ( objectNode.get("verb") != null ) {
-        try {
-          activity = mapper.convertValue(objectNode, Activity.class);
-          activityObject = activity.getObject();
-        } catch (Exception ex) {
-          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
-        }
-      } else {
-        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
-      }
-    }
-
-    Preconditions.checkArgument(activity != null || activityObject != null);
-
-    ObjectNode request = mapper.createObjectNode();
-    ArrayNode statements = mapper.createArrayNode();
-
-    // always add vertices first
-
-    List<String> labels = Collections.singletonList("streams");
-
-    if ( activityObject != null ) {
-      if ( activityObject.getObjectType() != null ) {
-        labels.add(activityObject.getObjectType());
-      }
-      statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
-    }
-
-    if ( activity != null ) {
-
-      ActivityObject actor = activity.getActor();
-      Provider provider = activity.getProvider();
-
-      if (provider != null && StringUtils.isNotBlank(provider.getId())) {
-        labels.add(provider.getId());
-      }
-      if (actor != null && StringUtils.isNotBlank(actor.getId())) {
-        if (actor.getObjectType() != null) {
-          labels.add(actor.getObjectType());
-        }
-        statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
-      }
-
-      if (activityObject != null && StringUtils.isNotBlank(activityObject.getId())) {
-        if (activityObject.getObjectType() != null) {
-          labels.add(activityObject.getObjectType());
-        }
-        statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
-      }
-
-      // then add edge
-
-      if (StringUtils.isNotBlank(activity.getVerb())) {
-        statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
-      }
-    }
-
-    request.put("statements", statements);
-    return request;
-
-  }
-
-  @Override
-  protected ObjectNode executePost(HttpPost httpPost) {
-
-    Objects.requireNonNull(httpPost);
-
-    ObjectNode result = null;
-
-    CloseableHttpResponse response = null;
-
-    String entityString = null;
-    try {
-      response = httpclient.execute(httpPost);
-      HttpEntity entity = response.getEntity();
-      if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
-        entityString = EntityUtils.toString(entity);
-        result = mapper.readValue(entityString, ObjectNode.class);
-      }
-      LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
-      if ( result == null
-           || (
-              result.get("errors") != null
-                  && result.get("errors").isArray()
-                  && result.get("errors").iterator().hasNext()
-              )
-          ) {
-        LOGGER.error("Write Error: " + result.get("errors"));
-      } else {
-        LOGGER.debug("Write Success");
-      }
-    } catch (IOException ex) {
-      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
-    } catch (Exception ex) {
-      LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
-    } finally {
-      try {
-        if ( response != null) {
-          response.close();
-        }
-      } catch (IOException ignored) {
-        LOGGER.trace("ignored IOException", ignored);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-
-    super.prepare(configuration);
-    mapper = StreamsJacksonMapper.getInstance();
-
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      queryGraphHelper = new CypherQueryGraphHelper();
-      httpGraphHelper = new Neo4jHttpGraphHelper();
-    }
-
-    Objects.requireNonNull(queryGraphHelper);
-    Objects.requireNonNull(httpGraphHelper);
-  }
-
-  @Override
-  public void cleanUp() {
-
-    LOGGER.info("exiting");
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
deleted file mode 100644
index 9560083..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpProviderConfiguration;
-import org.apache.streams.components.http.provider.SimpleHttpProvider;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.neo4j.CypherQueryResponse;
-import org.apache.streams.graph.neo4j.ItemData;
-import org.apache.streams.graph.neo4j.ItemMetadata;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Reads a stream of activityobjects from vertices in a graph database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphVertexReader extends SimpleHttpProvider implements StreamsPersistReader {
-
-  public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName();
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class);
-
-  protected GraphReaderConfiguration configuration;
-
-  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  /**
-   * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'.
-   */
-  public GraphVertexReader() {
-    this(new ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-  }
-
-  /**
-   * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
-   * @param configuration GraphReaderConfiguration
-   */
-  public GraphVertexReader(GraphReaderConfiguration configuration) {
-    super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
-    } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
-      super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
-    }
-    this.configuration = configuration;
-  }
-
-  /**
-   * Neo API query returns something like this:
-   * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] }
-   *
-   * @param jsonNode jsonNode
-   * @return result
-   */
-  public List<ObjectNode> parse(JsonNode jsonNode) {
-    List<ObjectNode> results = new ArrayList<>();
-
-    ObjectNode root = (ObjectNode) jsonNode;
-
-    CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class);
-
-    for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) {
-
-      for (List<ItemMetadata> itemMetadatas : dataWrapper) {
-
-        for (ItemMetadata itemMetadata : itemMetadatas) {
-
-          ItemData itemData = itemMetadata.getData();
-
-          LOGGER.debug("itemData: " + itemData);
-
-          results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
-        }
-
-      }
-
-    }
-    return results;
-  }
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-
-    super.prepare(configurationObject);
-
-  }
-
-  @Override
-  public StreamsResultSet readAll() {
-    return readCurrent();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
index ca1f4e4..804e9ff 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -29,6 +29,8 @@ import java.util.Map;
  */
 public interface HttpGraphHelper {
 
-  ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters);
+  ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters);
+
+  ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
index 1699aee..38ceb55 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -39,6 +39,8 @@ public interface QueryGraphHelper {
 
   public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject);
 
-  public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity);
+  public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity);
+
+  public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
deleted file mode 100644
index a361139..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.QueryGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.stringtemplate.v4.ST;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API
- */
-public class CypherQueryGraphHelper implements QueryGraphHelper {
-
-  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
-  private static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
-  private static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
-
-  private static final String createVertexStatementTemplate =
-      "MATCH (x {id: '<id>'}) "
-          + "CREATE UNIQUE (v:<type> { props }) "
-          + "ON CREATE SET v <labels> "
-          + "RETURN v";
-
-
-
-  private static final String mergeVertexStatementTemplate =
-      "MERGE (v:<type> {id: '<id>'}) "
-          + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "
-          + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "
-          + "RETURN v";
-
-  private static final String createEdgeStatementTemplate =
-      "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "
-          + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "
-          + "RETURN r";
-
-  /**
-   * getVertexRequest.
-   * @param streamsId streamsId
-   * @return pair (streamsId, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
-
-    ST getVertex = new ST(getVertexStringIdStatementTemplate);
-    getVertex.add("id", streamsId);
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
-
-    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * getVertexRequest.
-   * @param vertexId numericId
-   * @return pair (streamsId, parameterMap)
-   */
-  @Override
-  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
-
-    ST getVertex = new ST(getVertexLongIdStatementTemplate);
-    getVertex.add("id", vertexId);
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
-
-    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
-    return queryPlusParameters;
-
-  }
-
-  /**
-   * createVertexRequest.
-   * @param activityObject activityObject
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
-
-    Objects.requireNonNull(activityObject.getObjectType());
-
-    List<String> labels = getLabels(activityObject);
-
-    ST createVertex = new ST(createVertexStatementTemplate);
-    createVertex.add("id", activityObject.getId());
-    createVertex.add("type", activityObject.getObjectType());
-
-    if ( labels.size() > 0 ) {
-      createVertex.add("labels", String.join(" ", labels));
-    }
-
-    String query = createVertex.render();
-
-    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
-
-    LOGGER.debug("createVertexRequest: ({},{})", query, props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * mergeVertexRequest.
-   * @param activityObject activityObject
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
-
-    Objects.requireNonNull(activityObject.getObjectType());
-
-    Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
-    List<String> labels = getLabels(activityObject);
-
-    ST mergeVertex = new ST(mergeVertexStatementTemplate);
-    mergeVertex.add("id", activityObject.getId());
-    mergeVertex.add("type", activityObject.getObjectType());
-    if ( labels.size() > 0 ) {
-      mergeVertex.add("labels", String.join(" ", labels));
-    }
-    String query = mergeVertex.render();
-
-    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
-
-    queryPlusParameters = queryPlusParameters.setAt0(query);
-    queryPlusParameters = queryPlusParameters.setAt1(props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * createEdgeRequest.
-   * @param activity activity
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) {
-
-    Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
-    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    ST mergeEdge = new ST(createEdgeStatementTemplate);
-    mergeEdge.add("s_id", activity.getActor().getId());
-    mergeEdge.add("s_type", activity.getActor().getObjectType());
-    mergeEdge.add("d_id", activity.getObject().getId());
-    mergeEdge.add("d_type", activity.getObject().getObjectType());
-    mergeEdge.add("r_id", activity.getId());
-    mergeEdge.add("r_type", activity.getVerb());
-    mergeEdge.add("r_props", getPropertyCreater(props));
-
-    // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
-    activity.setActor(null);
-    activity.setObject(null);
-    activity.setTarget(null);
-    activity.getAdditionalProperties().put("extensions", null);
-
-    String statement = mergeEdge.render();
-    queryPlusParameters = queryPlusParameters.setAt0(statement);
-    queryPlusParameters = queryPlusParameters.setAt1(props);
-
-    LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * getPropertyCreater.
-   * @param map paramMap
-   * @return PropertyCreater string
-   */
-  public static String getPropertyCreater(Map<String, Object> map) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("{");
-    List<String> parts = new ArrayList<>();
-    for ( Map.Entry<String, Object> entry : map.entrySet()) {
-      if ( entry.getValue() instanceof String ) {
-        String propVal = (String) (entry.getValue());
-        parts.add("`" + entry.getKey() + "`:'" + propVal + "'");
-      }
-    }
-    builder.append(String.join(",", parts));
-    builder.append("}");
-    return builder.toString();
-  }
-
-  private List<String> getLabels(ActivityObject activityObject) {
-    List<String> labels = Collections.singletonList(":streams");
-    if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
-      List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
-      for ( String extraLabel : extraLabels ) {
-        labels.add(":" + extraLabel);
-      }
-    }
-    return labels;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
deleted file mode 100644
index 9f47058..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.graph.HttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API.
- */
-public class Neo4jHttpGraphHelper implements HttpGraphHelper {
-
-  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
-  private static final String statementKey = "statement";
-  private static final String paramsKey = "parameters";
-  private static final String propsKey = "props";
-
-  /**
-   * createHttpRequest neo4j rest json payload.
-   *
-   * @param queryPlusParameters (query, parameter map)
-   * @return ObjectNode
-   */
-  public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) {
-
-    LOGGER.debug("createHttpRequest: ", queryPlusParameters);
-
-    Objects.requireNonNull(queryPlusParameters);
-    Objects.requireNonNull(queryPlusParameters.getValue0());
-    Objects.requireNonNull(queryPlusParameters.getValue1());
-
-    ObjectNode request = MAPPER.createObjectNode();
-
-    request.put(statementKey, queryPlusParameters.getValue0());
-
-    ObjectNode params = MAPPER.createObjectNode();
-    ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
-
-    params.put(propsKey, props);
-    request.put(paramsKey, params);
-
-    LOGGER.debug("createHttpRequest: ", request);
-
-    return request;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
deleted file mode 100644
index 04a70e1..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "gremlin"]
-        },
-        "file": {
-            "type": "string",
-            "description": "New Graph DB File"
-        },
-        "indexFields": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            },
-            "description": "Fields to index under streams label"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
deleted file mode 100644
index de92443..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphConfiguration",
-    "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "rexster"]
-        },
-        "graph": {
-            "type": "string",
-            "description": "Graph DB Graph ID"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
deleted file mode 100644
index c63e0fb..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphHttpConfiguration",
-    "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "rexster"]
-        },
-        "graph": {
-            "type": "string",
-            "description": "Graph DB Graph ID"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
deleted file mode 100644
index 77c6fd7..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
+++ /dev/null
@@ -1,43 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.neo4j.CypherQueryResponse",
-    "properties": {
-        "columns": {
-            "type": "array",
-            "id": "http://jsonschema.net/columns",
-            "required": false,
-            "items": {
-                "type": "string",
-                "id": "http://jsonschema.net/columns/0",
-                "required": false
-            }
-        },
-        "data": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "array",
-                "required": false,
-                "items": {
-                    "type": "array",
-                    "required": false,
-                    "items": {
-                        "type": "object",
-                        "javaType" : "org.apache.streams.graph.neo4j.ItemMetadata",
-                        "properties": {
-                            "data": {
-                                "type": "object",
-                                "javaType" : "org.apache.streams.graph.neo4j.ItemData"
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
deleted file mode 100644
index c29c8b7..0000000
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import org.javatuples.Pair;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * TestCypherQueryGraphHelper tests
- * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
- */
-public class TestCypherQueryGraphHelper {
-
-  CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
-
-  @Test
-  public void getVertexRequestIdTest() throws Exception {
-
-    Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id");
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-
-  }
-
-  @Test
-  public void getVertexRequestLongTest() throws Exception {
-
-    Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1));
-
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-
-  }
-
-  @Test
-  public void createVertexRequestTest() throws Exception {
-
-    ActivityObject activityObject = new ActivityObject();
-    activityObject.setId("id");
-    activityObject.setObjectType("type");
-    activityObject.setContent("content");
-
-    Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject);
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-
-  @Test
-  public void mergeVertexRequestTest() throws Exception {
-
-    ActivityObject activityObject = new ActivityObject();
-    activityObject.setId("id");
-    activityObject.setObjectType("type");
-    activityObject.setContent("content");
-
-    Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject);
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-
-  @Test
-  public void createEdgeRequestTest() throws Exception {
-
-    ActivityObject actor = new ActivityObject();
-    actor.setId("actor");
-    actor.setObjectType("type");
-    actor.setContent("content");
-
-    ActivityObject object = new ActivityObject();
-    object.setId("object");
-    object.setObjectType("type");
-    object.setContent("content");
-
-    Activity activity = new Activity();
-    activity.setId("activity");
-    activity.setVerb("verb");
-    activity.setContent("content");
-
-    activity.setActor(actor);
-    activity.setObject(object);
-
-    Pair<String, Map<String, Object>> queryAndParams = helper.createEdgeRequest(activity);
-
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
deleted file mode 100644
index 24ddd65..0000000
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.GraphHttpConfiguration;
-import org.apache.streams.graph.GraphReaderConfiguration;
-import org.apache.streams.graph.GraphVertexReader;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.IOUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-/**
- * Unit test for {@link org.apache.streams.graph.GraphVertexReader}
- *
- * Test that graph db responses can be converted to streams data.
- */
-public class TestNeo4jHttpVertexReader {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class);
-
-  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  private JsonNode sampleReaderResult;
-
-  private GraphReaderConfiguration testConfiguration;
-
-  private GraphVertexReader graphPersistReader;
-
-  @Before
-  public void prepareTest() throws IOException {
-
-    testConfiguration = new GraphReaderConfiguration();
-    testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
-
-    graphPersistReader = new GraphVertexReader(testConfiguration);
-    InputStream testActivityFileStream = TestNeo4jHttpVertexReader.class.getClassLoader()
-        .getResourceAsStream("sampleReaderResult.json");
-    String sampleText = IOUtils.toString(testActivityFileStream, "utf-8");
-    sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
-
-  }
-
-  @Test
-  public void testParseNeoResult() throws IOException {
-
-    List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
-
-    assert( result.size() == 10);
-
-    for( int i = 0 ; i < 10; i++ )
-      assert( result.get(i).get("extensions").size() == 5);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/pom.xml b/streams-contrib/streams-persist-neo4j/pom.xml
new file mode 100644
index 0000000..d117558
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/pom.xml
@@ -0,0 +1,263 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.5-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-neo4j</artifactId>
+    <name>streams-persist-neo4j</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-converters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-graph</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.javatuples</groupId>
+            <artifactId>javatuples</artifactId>
+            <version>1.2</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j.driver</groupId>
+            <artifactId>neo4j-java-driver</artifactId>
+            <version>1.0.6</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-schema-activitystreams</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.streams.plugins</groupId>
+                <artifactId>streams-plugin-pojo</artifactId>
+                <version>${project.version}</version>
+                <configuration>
+                    <sourcePaths>
+                        <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+                    <targetPackage>org.apache.streams.graph.pojo</targetPackage>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.streams</groupId>
+                        <artifactId>streams-http</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <includes>**/*.json</includes>
+                    <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                    <includeGroupIds>org.apache.streams</includeGroupIds>
+                    <includeTypes>test-jar</includeTypes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test-resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>neo4j:3.0.6</name>
+                                    <alias>neo4j</alias>
+                                    <run>
+                                        <env>
+                                            <NEO4J_AUTH>none</NEO4J_AUTH>
+                                        </env>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            <port>${neo4j.http.host}:${neo4j.http.port}:7474</port>
+                                            <port>${neo4j.tcp.host}:${neo4j.tcp.port}:7687</port>
+                                        </ports>
+                                        <portPropertyFile>neo4j.properties</portPropertyFile>
+                                        <wait>
+                                            <log>neo4j startup</log>
+                                            <http>
+                                                <url>http://${neo4j.http.host}:${neo4j.http.port}</url>
+                                                <method>GET</method>
+                                                <status>200</status>
+                                            </http>
+                                            <time>20000</time>
+                                            <kill>1000</kill>
+                                            <shutdown>500</shutdown>
+                                            <!--<tcp>-->
+                                            <!--<host>${es.transport.host}</host>-->
+                                            <!--<ports>-->
+                                            <!--<port>${es.transport.port}</port>-->
+                                            <!--</ports>-->
+                                            <!--</tcp>-->
+                                        </wait>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
new file mode 100644
index 0000000..c117c16
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j;
+
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.stringtemplate.v4.ST;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherQueryGraphHelper implements QueryGraphHelper, Serializable {
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CypherQueryGraphHelper.class);
+
+  public static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
+  public static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+  public static final String getVerticesLabelIdStatementTemplate = "MATCH (v:<type>) RETURN v";
+
+  public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+      "CREATE UNIQUE (v:`<type>` { props }) "+
+      "ON CREATE SET v <labels> "+
+      "RETURN v";
+
+  public final static String mergeVertexStatementTemplate = "MERGE (v:`<type>` {id: '<id>'}) "+
+      "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+      "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+      "RETURN v";
+
+  public final static String createEdgeStatementTemplate = "MATCH (s:`<s_type>` {id: '<s_id>'}),(d:`<d_type>` {id: '<d_id>'}) "+
+      "CREATE UNIQUE (s)-[r:`<r_type>` <r_props>]->(d) "+
+      "RETURN r";
+
+  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
+
+    ST getVertex = new ST(getVertexStringIdStatementTemplate);
+    getVertex.add("id", streamsId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getVertexRequest.
+   * @param vertexId numericId
+   * @return pair (streamsId, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+
+    ST getVertex = new ST(getVertexLongIdStatementTemplate);
+    getVertex.add("id", vertexId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+
+  }
+
+  /**
+   * createVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+    Objects.requireNonNull(activityObject.getObjectType());
+
+    List<String> labels = getLabels(activityObject);
+
+    ST createVertex = new ST(createVertexStatementTemplate);
+    createVertex.add("id", activityObject.getId());
+    createVertex.add("type", activityObject.getObjectType());
+
+    if ( labels.size() > 0 ) {
+      createVertex.add("labels", String.join(" ", labels));
+    }
+
+    String query = createVertex.render();
+
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
+
+    LOGGER.debug("createVertexRequest: ({},{})", query, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getVerticesRequest gets all vertices with a label.
+   * @param labelId labelId
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> getVerticesRequest(String labelId) {
+    ST getVertex = new ST(getVerticesLabelIdStatementTemplate);
+    getVertex.add("type", labelId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * mergeVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
+
+    Objects.requireNonNull(activityObject.getObjectType());
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    List<String> labels = getLabels(activityObject);
+
+    ST mergeVertex = new ST(mergeVertexStatementTemplate);
+    mergeVertex.add("id", activityObject.getId());
+    mergeVertex.add("type", activityObject.getObjectType());
+    if ( labels.size() > 0 ) {
+      mergeVertex.add("labels", String.join(" ", labels));
+    }
+    String query = mergeVertex.render();
+
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+    queryPlusParameters = queryPlusParameters.setAt0(query);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * createActorObjectEdge.
+   * @param activity activity
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity) {
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    ST mergeEdge = new ST(createEdgeStatementTemplate);
+    mergeEdge.add("s_id", activity.getActor().getId());
+    mergeEdge.add("s_type", activity.getActor().getObjectType());
+    mergeEdge.add("d_id", activity.getObject().getId());
+    mergeEdge.add("d_type", activity.getObject().getObjectType());
+    mergeEdge.add("r_id", activity.getId());
+    mergeEdge.add("r_type", activity.getVerb());
+    mergeEdge.add("r_props", getActorObjectEdgePropertyCreater(props));
+
+    String statement = mergeEdge.render();
+    queryPlusParameters = queryPlusParameters.setAt0(statement);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * createActorTargetEdge.
+   * @param activity activity
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity) {
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    ST mergeEdge = new ST(createEdgeStatementTemplate);
+    mergeEdge.add("s_id", activity.getActor().getId());
+    mergeEdge.add("s_type", activity.getActor().getObjectType());
+    mergeEdge.add("d_id", activity.getTarget().getId());
+    mergeEdge.add("d_type", activity.getTarget().getObjectType());
+    mergeEdge.add("r_id", activity.getId());
+    mergeEdge.add("r_type", activity.getVerb());
+    mergeEdge.add("r_props", getActorTargetEdgePropertyCreater(props));
+
+    String statement = mergeEdge.render();
+    queryPlusParameters = queryPlusParameters.setAt0(statement);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getPropertyValueSetter.
+   * @param map paramMap
+   * @return PropertyValueSetter string
+   */
+  public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+    StringBuilder builder = new StringBuilder();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String)(entry.getValue());
+        builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * getPropertyParamSetter.
+   * @param map paramMap
+   * @return PropertyParamSetter string
+   */
+  public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+    StringBuilder builder = new StringBuilder();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String)(entry.getValue());
+        builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * getPropertyCreater.
+   * @param map paramMap
+   * @return PropertyCreater string
+   */
+  public static String getPropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        parts.add("`"+entry.getKey() + "`:'" + StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    builder.append(String.join(" ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  private String getActorObjectEdgePropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        if( !entry.getKey().contains(".")) {
+          parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+        }
+      }
+    }
+    builder.append(String.join(", ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  private String getActorTargetEdgePropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        if( !entry.getKey().contains(".")) {
+          parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+        } else if( entry.getKey().startsWith("object.") && !entry.getKey().contains(".id")) {
+          parts.add("`"+entry.getKey().substring("object.".length()) + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+        }
+      }
+    }
+    builder.append(String.join(", ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  /**
+   * getLabels.
+   * @param activityObject activityObject
+   * @return PropertyCreater string
+   */
+  public static List<String> getLabels(ActivityObject activityObject) {
+    List<String> labels = Collections.singletonList(":streams");
+    if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
+      List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+      for ( String extraLabel : extraLabels ) {
+        labels.add(":" + extraLabel);
+      }
+    }
+    return labels;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
new file mode 100644
index 0000000..6058c66
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
@@ -0,0 +1,151 @@
+package org.apache.streams.neo4j;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.commons.lang3.StringUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 1/2/17.
+ */
+public class Neo4jPersistUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+
+  public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
+
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
+
+    String id = entry.getId();
+    Activity activity = null;
+    ActivityObject activityObject = null;
+    Object document = entry.getDocument();
+
+    if (document instanceof Activity) {
+      activity = (Activity) document;
+    } else if (document instanceof ActivityObject) {
+      activityObject = (ActivityObject) document;
+    } else {
+      ObjectNode objectNode;
+      if (document instanceof ObjectNode) {
+        objectNode = (ObjectNode) document;
+      } else if ( document instanceof String) {
+        try {
+          objectNode = mapper.readValue((String) document, ObjectNode.class);
+        } catch (IOException ex) {
+          LOGGER.error("Can't handle input: ", entry);
+          throw ex;
+        }
+      } else {
+        LOGGER.error("Can't handle input: ", entry);
+        throw new Exception("Can't create statements from datum.");
+      }
+
+      if ( objectNode.get("verb") != null ) {
+        try {
+          activity = mapper.convertValue(objectNode, Activity.class);
+          activityObject = activity.getObject();
+        } catch (Exception ex) {
+          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+        }
+      } else {
+        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+      }
+
+    }
+
+    Preconditions.checkArgument(activity != null ^ activityObject != null);
+
+    if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
+
+      statements.add(vertexStatement(activityObject));
+
+    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
+
+      statements.addAll(vertexStatements(activity));
+
+      statements.addAll(edgeStatements(activity));
+
+    }
+
+    return statements;
+  }
+
+  public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+    ActivityObject actor = activity.getActor();
+    ActivityObject object = activity.getObject();
+    ActivityObject target = activity.getTarget();
+
+    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
+      Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
+      statements.add(actorStatement);
+    }
+
+    if (object != null && StringUtils.isNotBlank(object.getId())) {
+      Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
+      statements.add(objectStatement);
+    }
+
+    if (target != null && StringUtils.isNotBlank(target.getId())) {
+      Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
+      statements.add(targetStatement);
+    }
+
+    return statements;
+  }
+
+  public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+    ActivityObject actor = activity.getActor();
+    ActivityObject object = activity.getObject();
+    ActivityObject target = activity.getTarget();
+
+    if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
+      Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
+      Map<String, Object> props = new HashMap<>();
+      props.put("props", actorObjectEdgeStatement.getValue1());
+      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
+      statements.add(actorObjectEdgeStatement);
+    }
+
+    if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
+      Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
+      Map<String, Object> props = new HashMap<>();
+      props.put("props", actorTargetEdgeStatement.getValue1());
+      actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
+      statements.add(actorTargetEdgeStatement);
+    }
+
+    return statements;
+  }
+
+  public static Pair<String, Map<String, Object>> vertexStatement(ActivityObject activityObject) {
+    Pair<String, Map<String, Object>> mergeVertexRequest = helper.mergeVertexRequest(activityObject);
+    Map<String, Object> props = new HashMap<>();
+    props.put("props", mergeVertexRequest.getValue1());
+    mergeVertexRequest = mergeVertexRequest.setAt1(props);
+    return mergeVertexRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
new file mode 100644
index 0000000..9bfc049
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
@@ -0,0 +1,92 @@
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.neo4j.Neo4jConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.neo4j.driver.v1.AuthToken;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Neo4jBoltClient {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(Neo4jBoltClient.class);
+
+    private Driver client;
+
+    public Neo4jConfiguration config;
+
+    private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) {
+        this.config = neo4jConfiguration;
+        try {
+            this.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.client = null;
+        }
+    }
+
+    private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>();
+
+    public static Neo4jBoltClient getInstance(Neo4jConfiguration neo4jConfiguration) {
+        if ( INSTANCE_MAP != null &&
+             INSTANCE_MAP.size() > 0 &&
+             INSTANCE_MAP.containsKey(neo4jConfiguration)) {
+            return INSTANCE_MAP.get(neo4jConfiguration);
+        } else {
+            Neo4jBoltClient instance = new Neo4jBoltClient(neo4jConfiguration);
+            if( instance != null && instance.client != null ) {
+                INSTANCE_MAP.put(neo4jConfiguration, instance);
+                return instance;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public void start() throws Exception {
+
+        Objects.nonNull(config);
+        assertThat("config.getScheme().startsWith(\"tcp\")", config.getScheme().startsWith("tcp"));
+
+        LOGGER.info("Neo4jConfiguration.start {}", config);
+
+        AuthToken authToken = null;
+        if( StringUtils.isNotBlank(config.getUsername()) && StringUtils.isNotBlank(config.getPassword())) {
+            authToken = AuthTokens.basic( config.getUsername(), config.getPassword() );
+        }
+
+        if( authToken == null ) {
+            client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort());
+        } else {
+            client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort(), authToken);
+        }
+
+        Objects.nonNull(client);
+
+    }
+
+    public void stop() throws Exception {
+        this.client.session().close();
+        this.client = null;
+    }
+
+    public Neo4jConfiguration config() {
+        return config;
+    }
+
+    public Driver client() {
+        return client;
+    }
+}