You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/08 18:36:30 UTC
[2/2] incubator-streams git commit: STREAMS:344: streams-persist-neo4j
STREAMS:344: streams-persist-neo4j
Squashed commit of the following:
commit 76207b1577a0fb6f05992c8700151223db20e4b3
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Sun Jan 8 12:29:32 2017 -0600
STREAMS-344: address PR feedback
from https://github.com/apache/incubator-streams/pull/348
commit ee700fd16e8631bdb0fb453d686beef4167af13b
Author: Steve Blackmon <sb...@apache.org>
Date: Mon Jan 2 19:42:33 2017 -0600
add constructor
commit 1f4e175cf84a208252d488c2858ea420af0642f9
Author: Steve Blackmon <sb...@apache.org>
Date: Mon Jan 2 18:11:01 2017 -0600
new neo4j module with bolt:// and http:// support, and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bd22317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bd22317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bd22317
Branch: refs/heads/master
Commit: 4bd22317ea3a67b7dfdc0c9d3aba96a71f712e3a
Parents: 7810361
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sun Jan 8 12:36:18 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sun Jan 8 12:36:18 2017 -0600
----------------------------------------------------------------------
streams-contrib/pom.xml | 3 +-
streams-contrib/streams-persist-graph/pom.xml | 20 +-
.../streams/graph/GraphHttpPersistWriter.java | 250 --------------
.../apache/streams/graph/GraphVertexReader.java | 126 -------
.../apache/streams/graph/HttpGraphHelper.java | 4 +-
.../apache/streams/graph/QueryGraphHelper.java | 4 +-
.../graph/neo4j/CypherQueryGraphHelper.java | 238 -------------
.../graph/neo4j/Neo4jHttpGraphHelper.java | 75 ----
.../streams/graph/GraphBinaryConfiguration.json | 28 --
.../streams/graph/GraphConfiguration.json | 22 --
.../streams/graph/GraphHttpConfiguration.json | 22 --
.../graph/neo4j/CypherQueryResponse.json | 43 ---
.../graph/test/TestCypherQueryGraphHelper.java | 116 -------
.../graph/test/TestNeo4jHttpVertexReader.java | 81 -----
streams-contrib/streams-persist-neo4j/pom.xml | 263 ++++++++++++++
.../streams/neo4j/CypherQueryGraphHelper.java | 344 +++++++++++++++++++
.../apache/streams/neo4j/Neo4jPersistUtil.java | 151 ++++++++
.../streams/neo4j/bolt/Neo4jBoltClient.java | 92 +++++
.../neo4j/bolt/Neo4jBoltPersistReader.java | 326 ++++++++++++++++++
.../neo4j/bolt/Neo4jBoltPersistWriter.java | 77 +++++
.../streams/neo4j/http/Neo4jHttpClient.java | 74 ++++
.../neo4j/http/Neo4jHttpGraphHelper.java | 104 ++++++
.../neo4j/http/Neo4jHttpPersistReader.java | 173 ++++++++++
.../neo4j/http/Neo4jHttpPersistWriter.java | 171 +++++++++
.../streams/neo4j/CypherQueryResponse.json | 43 +++
.../streams/neo4j/Neo4jConfiguration.json | 27 ++
.../streams/neo4j/Neo4jReaderConfiguration.json | 17 +
.../streams/neo4j/test/Neo4jBoltPersistIT.java | 156 +++++++++
.../streams/neo4j/test/Neo4jHttpPersistIT.java | 138 ++++++++
.../neo4j/test/TestCypherQueryGraphHelper.java | 150 ++++++++
.../src/test/resources/Neo4jBoltPersistIT.conf | 20 ++
.../src/test/resources/Neo4jHttpPersistIT.conf | 20 ++
.../apache/streams/data/util/PropertyUtil.java | 124 -------
.../org/apache/streams/util/PropertyUtil.java | 130 +++++++
.../util/schema/test/PropertyUtilTest.java | 25 ++
35 files changed, 2510 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 8408cef..aed60c9 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -37,8 +37,8 @@
</properties>
<modules>
- <module>streams-persist-console</module>
<module>streams-persist-cassandra</module>
+ <module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
<module>streams-persist-filebuffer</module>
<module>streams-persist-hbase</module>
@@ -46,6 +46,7 @@
<module>streams-persist-graph</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
+ <module>streams-persist-neo4j</module>
<module>streams-amazon-aws</module>
<module>streams-processor-jackson</module>
<module>streams-processor-json</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
index b8db538..996c706 100644
--- a/streams-contrib/streams-persist-graph/pom.xml
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -147,25 +147,7 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <includes>**/*.json</includes>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
- <includeGroupIds>org.apache.streams</includeGroupIds>
- <includeTypes>test-jar</includeTypes>
- </configuration>
- <executions>
- <execution>
- <id>test-resource-dependencies</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>unpack-dependencies</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+
<!-- revisit using this if streams bumps to jdk8 -->
<!--<plugin>-->
<!--<groupId>com.github.harti2006</groupId>-->
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
deleted file mode 100644
index 5b2dec6..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Adds activityobjects as vertices and activities as edges to a graph database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
-
- public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName();
-
- private static final Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class);
- private static final long MAX_WRITE_LATENCY = 1000;
-
- private GraphHttpConfiguration configuration;
-
- private QueryGraphHelper queryGraphHelper;
- private HttpGraphHelper httpGraphHelper;
-
- private static ObjectMapper mapper;
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /**
- * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'.
- */
- public GraphHttpPersistWriter() {
- this(new ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
- }
-
- /**
- * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
- * @param configuration GraphHttpConfiguration
- */
- public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
- super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
- if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
- super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/");
- } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
- super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
- }
- this.configuration = configuration;
- }
-
- @Override
- protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
- Activity activity = null;
- ActivityObject activityObject;
- Object document = entry.getDocument();
-
- if (document instanceof Activity) {
- activity = (Activity) document;
- activityObject = activity.getObject();
- } else if (document instanceof ActivityObject) {
- activityObject = (ActivityObject) document;
- } else {
- ObjectNode objectNode;
- if (document instanceof ObjectNode) {
- objectNode = (ObjectNode) document;
- } else if ( document instanceof String) {
- try {
- objectNode = mapper.readValue((String) document, ObjectNode.class);
- } catch (IOException ex) {
- LOGGER.error("Can't handle input: ", entry);
- throw ex;
- }
- } else {
- LOGGER.error("Can't handle input: ", entry);
- throw new Exception("Can't create payload from datum.");
- }
-
- if ( objectNode.get("verb") != null ) {
- try {
- activity = mapper.convertValue(objectNode, Activity.class);
- activityObject = activity.getObject();
- } catch (Exception ex) {
- activityObject = mapper.convertValue(objectNode, ActivityObject.class);
- }
- } else {
- activityObject = mapper.convertValue(objectNode, ActivityObject.class);
- }
- }
-
- Preconditions.checkArgument(activity != null || activityObject != null);
-
- ObjectNode request = mapper.createObjectNode();
- ArrayNode statements = mapper.createArrayNode();
-
- // always add vertices first
-
- List<String> labels = Collections.singletonList("streams");
-
- if ( activityObject != null ) {
- if ( activityObject.getObjectType() != null ) {
- labels.add(activityObject.getObjectType());
- }
- statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
- }
-
- if ( activity != null ) {
-
- ActivityObject actor = activity.getActor();
- Provider provider = activity.getProvider();
-
- if (provider != null && StringUtils.isNotBlank(provider.getId())) {
- labels.add(provider.getId());
- }
- if (actor != null && StringUtils.isNotBlank(actor.getId())) {
- if (actor.getObjectType() != null) {
- labels.add(actor.getObjectType());
- }
- statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
- }
-
- if (activityObject != null && StringUtils.isNotBlank(activityObject.getId())) {
- if (activityObject.getObjectType() != null) {
- labels.add(activityObject.getObjectType());
- }
- statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
- }
-
- // then add edge
-
- if (StringUtils.isNotBlank(activity.getVerb())) {
- statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
- }
- }
-
- request.put("statements", statements);
- return request;
-
- }
-
- @Override
- protected ObjectNode executePost(HttpPost httpPost) {
-
- Objects.requireNonNull(httpPost);
-
- ObjectNode result = null;
-
- CloseableHttpResponse response = null;
-
- String entityString = null;
- try {
- response = httpclient.execute(httpPost);
- HttpEntity entity = response.getEntity();
- if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
- entityString = EntityUtils.toString(entity);
- result = mapper.readValue(entityString, ObjectNode.class);
- }
- LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
- if ( result == null
- || (
- result.get("errors") != null
- && result.get("errors").isArray()
- && result.get("errors").iterator().hasNext()
- )
- ) {
- LOGGER.error("Write Error: " + result.get("errors"));
- } else {
- LOGGER.debug("Write Success");
- }
- } catch (IOException ex) {
- LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
- } catch (Exception ex) {
- LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage());
- } finally {
- try {
- if ( response != null) {
- response.close();
- }
- } catch (IOException ignored) {
- LOGGER.trace("ignored IOException", ignored);
- }
- }
- return result;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- super.prepare(configuration);
- mapper = StreamsJacksonMapper.getInstance();
-
- if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
- queryGraphHelper = new CypherQueryGraphHelper();
- httpGraphHelper = new Neo4jHttpGraphHelper();
- }
-
- Objects.requireNonNull(queryGraphHelper);
- Objects.requireNonNull(httpGraphHelper);
- }
-
- @Override
- public void cleanUp() {
-
- LOGGER.info("exiting");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
deleted file mode 100644
index 9560083..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpProviderConfiguration;
-import org.apache.streams.components.http.provider.SimpleHttpProvider;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.neo4j.CypherQueryResponse;
-import org.apache.streams.graph.neo4j.ItemData;
-import org.apache.streams.graph.neo4j.ItemMetadata;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Reads a stream of activityobjects from vertices in a graph database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphVertexReader extends SimpleHttpProvider implements StreamsPersistReader {
-
- public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName();
-
- private static final Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class);
-
- protected GraphReaderConfiguration configuration;
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- /**
- * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'.
- */
- public GraphVertexReader() {
- this(new ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
- }
-
- /**
- * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
- * @param configuration GraphReaderConfiguration
- */
- public GraphVertexReader(GraphReaderConfiguration configuration) {
- super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
- if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
- super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
- } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
- super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
- }
- this.configuration = configuration;
- }
-
- /**
- * Neo API query returns something like this:
- * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] }
- *
- * @param jsonNode jsonNode
- * @return result
- */
- public List<ObjectNode> parse(JsonNode jsonNode) {
- List<ObjectNode> results = new ArrayList<>();
-
- ObjectNode root = (ObjectNode) jsonNode;
-
- CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class);
-
- for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) {
-
- for (List<ItemMetadata> itemMetadatas : dataWrapper) {
-
- for (ItemMetadata itemMetadata : itemMetadatas) {
-
- ItemData itemData = itemMetadata.getData();
-
- LOGGER.debug("itemData: " + itemData);
-
- results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
- }
-
- }
-
- }
- return results;
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- super.prepare(configurationObject);
-
- }
-
- @Override
- public StreamsResultSet readAll() {
- return readCurrent();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
index ca1f4e4..804e9ff 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -29,6 +29,8 @@ import java.util.Map;
*/
public interface HttpGraphHelper {
- ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters);
+ ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters);
+
+ ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
index 1699aee..38ceb55 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -39,6 +39,8 @@ public interface QueryGraphHelper {
public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject);
- public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity);
+ public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity);
+
+ public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
deleted file mode 100644
index a361139..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.QueryGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.stringtemplate.v4.ST;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API
- */
-public class CypherQueryGraphHelper implements QueryGraphHelper {
-
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
- private static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
- private static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
-
- private static final String createVertexStatementTemplate =
- "MATCH (x {id: '<id>'}) "
- + "CREATE UNIQUE (v:<type> { props }) "
- + "ON CREATE SET v <labels> "
- + "RETURN v";
-
-
-
- private static final String mergeVertexStatementTemplate =
- "MERGE (v:<type> {id: '<id>'}) "
- + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "
- + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "
- + "RETURN v";
-
- private static final String createEdgeStatementTemplate =
- "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "
- + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "
- + "RETURN r";
-
- /**
- * getVertexRequest.
- * @param streamsId streamsId
- * @return pair (streamsId, parameterMap)
- */
- public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
-
- ST getVertex = new ST(getVertexStringIdStatementTemplate);
- getVertex.add("id", streamsId);
-
- Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
-
- LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
- return queryPlusParameters;
- }
-
- /**
- * getVertexRequest.
- * @param vertexId numericId
- * @return pair (streamsId, parameterMap)
- */
- @Override
- public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
-
- ST getVertex = new ST(getVertexLongIdStatementTemplate);
- getVertex.add("id", vertexId);
-
- Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
-
- LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
- return queryPlusParameters;
-
- }
-
- /**
- * createVertexRequest.
- * @param activityObject activityObject
- * @return pair (query, parameterMap)
- */
- public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
-
- Objects.requireNonNull(activityObject.getObjectType());
-
- List<String> labels = getLabels(activityObject);
-
- ST createVertex = new ST(createVertexStatementTemplate);
- createVertex.add("id", activityObject.getId());
- createVertex.add("type", activityObject.getObjectType());
-
- if ( labels.size() > 0 ) {
- createVertex.add("labels", String.join(" ", labels));
- }
-
- String query = createVertex.render();
-
- ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
- Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
- Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
-
- LOGGER.debug("createVertexRequest: ({},{})", query, props);
-
- return queryPlusParameters;
- }
-
- /**
- * mergeVertexRequest.
- * @param activityObject activityObject
- * @return pair (query, parameterMap)
- */
- public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
-
- Objects.requireNonNull(activityObject.getObjectType());
-
- Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
- List<String> labels = getLabels(activityObject);
-
- ST mergeVertex = new ST(mergeVertexStatementTemplate);
- mergeVertex.add("id", activityObject.getId());
- mergeVertex.add("type", activityObject.getObjectType());
- if ( labels.size() > 0 ) {
- mergeVertex.add("labels", String.join(" ", labels));
- }
- String query = mergeVertex.render();
-
- ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
- Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
- LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
-
- queryPlusParameters = queryPlusParameters.setAt0(query);
- queryPlusParameters = queryPlusParameters.setAt1(props);
-
- return queryPlusParameters;
- }
-
- /**
- * createEdgeRequest.
- * @param activity activity
- * @return pair (query, parameterMap)
- */
- public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) {
-
- Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
- ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
- Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
- ST mergeEdge = new ST(createEdgeStatementTemplate);
- mergeEdge.add("s_id", activity.getActor().getId());
- mergeEdge.add("s_type", activity.getActor().getObjectType());
- mergeEdge.add("d_id", activity.getObject().getId());
- mergeEdge.add("d_type", activity.getObject().getObjectType());
- mergeEdge.add("r_id", activity.getId());
- mergeEdge.add("r_type", activity.getVerb());
- mergeEdge.add("r_props", getPropertyCreater(props));
-
- // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
- activity.setActor(null);
- activity.setObject(null);
- activity.setTarget(null);
- activity.getAdditionalProperties().put("extensions", null);
-
- String statement = mergeEdge.render();
- queryPlusParameters = queryPlusParameters.setAt0(statement);
- queryPlusParameters = queryPlusParameters.setAt1(props);
-
- LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
-
- return queryPlusParameters;
- }
-
- /**
- * getPropertyCreater.
- * @param map paramMap
- * @return PropertyCreater string
- */
- public static String getPropertyCreater(Map<String, Object> map) {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- List<String> parts = new ArrayList<>();
- for ( Map.Entry<String, Object> entry : map.entrySet()) {
- if ( entry.getValue() instanceof String ) {
- String propVal = (String) (entry.getValue());
- parts.add("`" + entry.getKey() + "`:'" + propVal + "'");
- }
- }
- builder.append(String.join(",", parts));
- builder.append("}");
- return builder.toString();
- }
-
- private List<String> getLabels(ActivityObject activityObject) {
- List<String> labels = Collections.singletonList(":streams");
- if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
- List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
- for ( String extraLabel : extraLabels ) {
- labels.add(":" + extraLabel);
- }
- }
- return labels;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
deleted file mode 100644
index 9f47058..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.graph.HttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API.
- */
-public class Neo4jHttpGraphHelper implements HttpGraphHelper {
-
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
- private static final String statementKey = "statement";
- private static final String paramsKey = "parameters";
- private static final String propsKey = "props";
-
- /**
- * createHttpRequest neo4j rest json payload.
- *
- * @param queryPlusParameters (query, parameter map)
- * @return ObjectNode
- */
- public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) {
-
- LOGGER.debug("createHttpRequest: ", queryPlusParameters);
-
- Objects.requireNonNull(queryPlusParameters);
- Objects.requireNonNull(queryPlusParameters.getValue0());
- Objects.requireNonNull(queryPlusParameters.getValue1());
-
- ObjectNode request = MAPPER.createObjectNode();
-
- request.put(statementKey, queryPlusParameters.getValue0());
-
- ObjectNode params = MAPPER.createObjectNode();
- ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
-
- params.put(propsKey, props);
- request.put(paramsKey, params);
-
- LOGGER.debug("createHttpRequest: ", request);
-
- return request;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
deleted file mode 100644
index 04a70e1..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "type": {
- "type": "string",
- "description": "Graph DB type",
- "enum" : ["neo4j", "gremlin"]
- },
- "file": {
- "type": "string",
- "description": "New Graph DB File"
- },
- "indexFields": {
- "type": "array",
- "items": {
- "type": "string"
- },
- "description": "Fields to index under streams label"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
deleted file mode 100644
index de92443..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphConfiguration",
- "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "type": {
- "type": "string",
- "description": "Graph DB type",
- "enum" : ["neo4j", "rexster"]
- },
- "graph": {
- "type": "string",
- "description": "Graph DB Graph ID"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
deleted file mode 100644
index c63e0fb..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphHttpConfiguration",
- "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "type": {
- "type": "string",
- "description": "Graph DB type",
- "enum" : ["neo4j", "rexster"]
- },
- "graph": {
- "type": "string",
- "description": "Graph DB Graph ID"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
deleted file mode 100644
index 77c6fd7..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
+++ /dev/null
@@ -1,43 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.neo4j.CypherQueryResponse",
- "properties": {
- "columns": {
- "type": "array",
- "id": "http://jsonschema.net/columns",
- "required": false,
- "items": {
- "type": "string",
- "id": "http://jsonschema.net/columns/0",
- "required": false
- }
- },
- "data": {
- "type": "array",
- "required": false,
- "items": {
- "type": "array",
- "required": false,
- "items": {
- "type": "array",
- "required": false,
- "items": {
- "type": "object",
- "javaType" : "org.apache.streams.graph.neo4j.ItemMetadata",
- "properties": {
- "data": {
- "type": "object",
- "javaType" : "org.apache.streams.graph.neo4j.ItemData"
- }
- }
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
deleted file mode 100644
index c29c8b7..0000000
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import org.javatuples.Pair;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * TestCypherQueryGraphHelper tests
- * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
- */
-public class TestCypherQueryGraphHelper {
-
- CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
-
- @Test
- public void getVertexRequestIdTest() throws Exception {
-
- Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id");
- assert(queryAndParams != null);
- assert(queryAndParams.getValue0() != null);
-
- }
-
- @Test
- public void getVertexRequestLongTest() throws Exception {
-
- Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1));
-
- assert(queryAndParams != null);
- assert(queryAndParams.getValue0() != null);
-
- }
-
- @Test
- public void createVertexRequestTest() throws Exception {
-
- ActivityObject activityObject = new ActivityObject();
- activityObject.setId("id");
- activityObject.setObjectType("type");
- activityObject.setContent("content");
-
- Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject);
- assert(queryAndParams != null);
- assert(queryAndParams.getValue0() != null);
- assert(queryAndParams.getValue1() != null);
-
- }
-
- @Test
- public void mergeVertexRequestTest() throws Exception {
-
- ActivityObject activityObject = new ActivityObject();
- activityObject.setId("id");
- activityObject.setObjectType("type");
- activityObject.setContent("content");
-
- Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject);
- assert(queryAndParams != null);
- assert(queryAndParams.getValue0() != null);
- assert(queryAndParams.getValue1() != null);
-
- }
-
- @Test
- public void createEdgeRequestTest() throws Exception {
-
- ActivityObject actor = new ActivityObject();
- actor.setId("actor");
- actor.setObjectType("type");
- actor.setContent("content");
-
- ActivityObject object = new ActivityObject();
- object.setId("object");
- object.setObjectType("type");
- object.setContent("content");
-
- Activity activity = new Activity();
- activity.setId("activity");
- activity.setVerb("verb");
- activity.setContent("content");
-
- activity.setActor(actor);
- activity.setObject(object);
-
- Pair<String, Map<String, Object>> queryAndParams = helper.createEdgeRequest(activity);
-
- assert(queryAndParams != null);
- assert(queryAndParams.getValue0() != null);
- assert(queryAndParams.getValue1() != null);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
deleted file mode 100644
index 24ddd65..0000000
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.GraphHttpConfiguration;
-import org.apache.streams.graph.GraphReaderConfiguration;
-import org.apache.streams.graph.GraphVertexReader;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.IOUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-/**
- * Unit test for {@link org.apache.streams.graph.GraphVertexReader}
- *
- * Test that graph db responses can be converted to streams data.
- */
-public class TestNeo4jHttpVertexReader {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class);
-
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- private JsonNode sampleReaderResult;
-
- private GraphReaderConfiguration testConfiguration;
-
- private GraphVertexReader graphPersistReader;
-
- @Before
- public void prepareTest() throws IOException {
-
- testConfiguration = new GraphReaderConfiguration();
- testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
-
- graphPersistReader = new GraphVertexReader(testConfiguration);
- InputStream testActivityFileStream = TestNeo4jHttpVertexReader.class.getClassLoader()
- .getResourceAsStream("sampleReaderResult.json");
- String sampleText = IOUtils.toString(testActivityFileStream, "utf-8");
- sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
-
- }
-
- @Test
- public void testParseNeoResult() throws IOException {
-
- List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
-
- assert( result.size() == 10);
-
- for( int i = 0 ; i < 10; i++ )
- assert( result.get(i).get("extensions").size() == 5);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/pom.xml b/streams-contrib/streams-persist-neo4j/pom.xml
new file mode 100644
index 0000000..d117558
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/pom.xml
@@ -0,0 +1,263 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.5-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-neo4j</artifactId>
+ <name>streams-persist-neo4j</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-converters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-graph</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>stringtemplate</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.javatuples</groupId>
+ <artifactId>javatuples</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j.driver</groupId>
+ <artifactId>neo4j-java-driver</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-schema-activitystreams</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.streams.plugins</groupId>
+ <artifactId>streams-plugin-pojo</artifactId>
+ <version>${project.version}</version>
+ <configuration>
+ <sourcePaths>
+ <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+ <targetPackage>org.apache.streams.graph.pojo</targetPackage>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeTypes>test-jar</includeTypes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test-resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>dockerITs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>skipITs</name>
+ <value>false</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration combine.self="override">
+ <watchInterval>500</watchInterval>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <autoPull>on</autoPull>
+ <images>
+ <image>
+ <name>neo4j:3.0.6</name>
+ <alias>neo4j</alias>
+ <run>
+ <env>
+ <NEO4J_AUTH>none</NEO4J_AUTH>
+ </env>
+ <namingStrategy>none</namingStrategy>
+ <ports>
+ <port>${neo4j.http.host}:${neo4j.http.port}:7474</port>
+ <port>${neo4j.tcp.host}:${neo4j.tcp.port}:7687</port>
+ </ports>
+ <portPropertyFile>neo4j.properties</portPropertyFile>
+ <wait>
+ <log>neo4j startup</log>
+ <http>
+ <url>http://${neo4j.http.host}:${neo4j.http.port}</url>
+ <method>GET</method>
+ <status>200</status>
+ </http>
+ <time>20000</time>
+ <kill>1000</kill>
+ <shutdown>500</shutdown>
+ <!--<tcp>-->
+ <!--<host>${es.transport.host}</host>-->
+ <!--<ports>-->
+ <!--<port>${es.transport.port}</port>-->
+ <!--</ports>-->
+ <!--</tcp>-->
+ </wait>
+ <log>
+ <enabled>true</enabled>
+ <date>default</date>
+ <color>cyan</color>
+ </log>
+ </run>
+ <watch>
+ <mode>none</mode>
+ </watch>
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
new file mode 100644
index 0000000..c117c16
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j;
+
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.stringtemplate.v4.ST;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherQueryGraphHelper implements QueryGraphHelper, Serializable {
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CypherQueryGraphHelper.class);
+
+ public static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
+ public static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+ public static final String getVerticesLabelIdStatementTemplate = "MATCH (v:<type>) RETURN v";
+
+ public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+ "CREATE UNIQUE (v:`<type>` { props }) "+
+ "ON CREATE SET v <labels> "+
+ "RETURN v";
+
+ public final static String mergeVertexStatementTemplate = "MERGE (v:`<type>` {id: '<id>'}) "+
+ "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+ "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+ "RETURN v";
+
+ public final static String createEdgeStatementTemplate = "MATCH (s:`<s_type>` {id: '<s_id>'}),(d:`<d_type>` {id: '<d_id>'}) "+
+ "CREATE UNIQUE (s)-[r:`<r_type>` <r_props>]->(d) "+
+ "RETURN r";
+
+ public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
+
+ ST getVertex = new ST(getVertexStringIdStatementTemplate);
+ getVertex.add("id", streamsId);
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+ LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * getVertexRequest.
+ * @param vertexId numericId
+ * @return pair (streamsId, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+
+ ST getVertex = new ST(getVertexLongIdStatementTemplate);
+ getVertex.add("id", vertexId);
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+ LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+ return queryPlusParameters;
+
+ }
+
+ /**
+ * createVertexRequest.
+ * @param activityObject activityObject
+ * @return pair (query, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+ Objects.requireNonNull(activityObject.getObjectType());
+
+ List<String> labels = getLabels(activityObject);
+
+ ST createVertex = new ST(createVertexStatementTemplate);
+ createVertex.add("id", activityObject.getId());
+ createVertex.add("type", activityObject.getObjectType());
+
+ if ( labels.size() > 0 ) {
+ createVertex.add("labels", String.join(" ", labels));
+ }
+
+ String query = createVertex.render();
+
+ ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
+
+ LOGGER.debug("createVertexRequest: ({},{})", query, props);
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * getVerticesRequest gets all vertices with a label.
+ * @param labelId labelId
+ * @return pair (query, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> getVerticesRequest(String labelId) {
+ ST getVertex = new ST(getVerticesLabelIdStatementTemplate);
+ getVertex.add("type", labelId);
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+ LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * mergeVertexRequest.
+ * @param activityObject activityObject
+ * @return pair (query, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
+
+ Objects.requireNonNull(activityObject.getObjectType());
+
+ Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+ List<String> labels = getLabels(activityObject);
+
+ ST mergeVertex = new ST(mergeVertexStatementTemplate);
+ mergeVertex.add("id", activityObject.getId());
+ mergeVertex.add("type", activityObject.getObjectType());
+ if ( labels.size() > 0 ) {
+ mergeVertex.add("labels", String.join(" ", labels));
+ }
+ String query = mergeVertex.render();
+
+ ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+ queryPlusParameters = queryPlusParameters.setAt0(query);
+ queryPlusParameters = queryPlusParameters.setAt1(props);
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * createActorObjectEdge.
+ * @param activity activity
+ * @return pair (query, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity) {
+
+ Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+ ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ ST mergeEdge = new ST(createEdgeStatementTemplate);
+ mergeEdge.add("s_id", activity.getActor().getId());
+ mergeEdge.add("s_type", activity.getActor().getObjectType());
+ mergeEdge.add("d_id", activity.getObject().getId());
+ mergeEdge.add("d_type", activity.getObject().getObjectType());
+ mergeEdge.add("r_id", activity.getId());
+ mergeEdge.add("r_type", activity.getVerb());
+ mergeEdge.add("r_props", getActorObjectEdgePropertyCreater(props));
+
+ String statement = mergeEdge.render();
+ queryPlusParameters = queryPlusParameters.setAt0(statement);
+ queryPlusParameters = queryPlusParameters.setAt1(props);
+
+ LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * createActorTargetEdge.
+ * @param activity activity
+ * @return pair (query, parameterMap)
+ */
+ public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity) {
+
+ Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+ ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ ST mergeEdge = new ST(createEdgeStatementTemplate);
+ mergeEdge.add("s_id", activity.getActor().getId());
+ mergeEdge.add("s_type", activity.getActor().getObjectType());
+ mergeEdge.add("d_id", activity.getTarget().getId());
+ mergeEdge.add("d_type", activity.getTarget().getObjectType());
+ mergeEdge.add("r_id", activity.getId());
+ mergeEdge.add("r_type", activity.getVerb());
+ mergeEdge.add("r_props", getActorTargetEdgePropertyCreater(props));
+
+ String statement = mergeEdge.render();
+ queryPlusParameters = queryPlusParameters.setAt0(statement);
+ queryPlusParameters = queryPlusParameters.setAt1(props);
+
+ LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+ return queryPlusParameters;
+ }
+
+ /**
+ * getPropertyValueSetter.
+ * @param map paramMap
+ * @return PropertyValueSetter string
+ */
+ public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ /**
+ * getPropertyParamSetter.
+ * @param map paramMap
+ * @return PropertyParamSetter string
+ */
+ public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ /**
+ * getPropertyCreater.
+ * @param map paramMap
+ * @return PropertyCreater string
+ */
+ public static String getPropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{ ");
+ List<String> parts = new ArrayList<>();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ parts.add("`"+entry.getKey() + "`:'" + StringEscapeUtils.escapeJava(propVal) + "'");
+ }
+ }
+ builder.append(String.join(" ", parts));
+ builder.append(" }");
+ return builder.toString();
+ }
+
+ private String getActorObjectEdgePropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{ ");
+ List<String> parts = new ArrayList<>();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ if( !entry.getKey().contains(".")) {
+ parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+ }
+ }
+ }
+ builder.append(String.join(", ", parts));
+ builder.append(" }");
+ return builder.toString();
+ }
+
+ private String getActorTargetEdgePropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{ ");
+ List<String> parts = new ArrayList<>();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ if( !entry.getKey().contains(".")) {
+ parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+ } else if( entry.getKey().startsWith("object.") && !entry.getKey().contains(".id")) {
+ parts.add("`"+entry.getKey().substring("object.".length()) + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'");
+ }
+ }
+ }
+ builder.append(String.join(", ", parts));
+ builder.append(" }");
+ return builder.toString();
+ }
+
+ /**
+ * getLabels.
+ * @param activityObject activityObject
+ * @return PropertyCreater string
+ */
+ public static List<String> getLabels(ActivityObject activityObject) {
+ List<String> labels = Collections.singletonList(":streams");
+ if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
+ List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+ for ( String extraLabel : extraLabels ) {
+ labels.add(":" + extraLabel);
+ }
+ }
+ return labels;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
new file mode 100644
index 0000000..6058c66
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
@@ -0,0 +1,151 @@
+package org.apache.streams.neo4j;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.commons.lang3.StringUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 1/2/17.
+ */
+public class Neo4jPersistUtil {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+
+ public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
+
+ List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
+
+ String id = entry.getId();
+ Activity activity = null;
+ ActivityObject activityObject = null;
+ Object document = entry.getDocument();
+
+ if (document instanceof Activity) {
+ activity = (Activity) document;
+ } else if (document instanceof ActivityObject) {
+ activityObject = (ActivityObject) document;
+ } else {
+ ObjectNode objectNode;
+ if (document instanceof ObjectNode) {
+ objectNode = (ObjectNode) document;
+ } else if ( document instanceof String) {
+ try {
+ objectNode = mapper.readValue((String) document, ObjectNode.class);
+ } catch (IOException ex) {
+ LOGGER.error("Can't handle input: ", entry);
+ throw ex;
+ }
+ } else {
+ LOGGER.error("Can't handle input: ", entry);
+ throw new Exception("Can't create statements from datum.");
+ }
+
+ if ( objectNode.get("verb") != null ) {
+ try {
+ activity = mapper.convertValue(objectNode, Activity.class);
+ activityObject = activity.getObject();
+ } catch (Exception ex) {
+ activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+ }
+ } else {
+ activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+ }
+
+ }
+
+ Preconditions.checkArgument(activity != null ^ activityObject != null);
+
+ if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
+
+ statements.add(vertexStatement(activityObject));
+
+ } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
+
+ statements.addAll(vertexStatements(activity));
+
+ statements.addAll(edgeStatements(activity));
+
+ }
+
+ return statements;
+ }
+
+ public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
+ List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+ ActivityObject actor = activity.getActor();
+ ActivityObject object = activity.getObject();
+ ActivityObject target = activity.getTarget();
+
+ if (actor != null && StringUtils.isNotBlank(actor.getId())) {
+ Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
+ statements.add(actorStatement);
+ }
+
+ if (object != null && StringUtils.isNotBlank(object.getId())) {
+ Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
+ statements.add(objectStatement);
+ }
+
+ if (target != null && StringUtils.isNotBlank(target.getId())) {
+ Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
+ statements.add(targetStatement);
+ }
+
+ return statements;
+ }
+
+ public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
+ List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+ ActivityObject actor = activity.getActor();
+ ActivityObject object = activity.getObject();
+ ActivityObject target = activity.getTarget();
+
+ if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
+ Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
+ Map<String, Object> props = new HashMap<>();
+ props.put("props", actorObjectEdgeStatement.getValue1());
+ actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
+ statements.add(actorObjectEdgeStatement);
+ }
+
+ if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
+ Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
+ Map<String, Object> props = new HashMap<>();
+ props.put("props", actorTargetEdgeStatement.getValue1());
+ actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
+ statements.add(actorTargetEdgeStatement);
+ }
+
+ return statements;
+ }
+
+ public static Pair<String, Map<String, Object>> vertexStatement(ActivityObject activityObject) {
+ Pair<String, Map<String, Object>> mergeVertexRequest = helper.mergeVertexRequest(activityObject);
+ Map<String, Object> props = new HashMap<>();
+ props.put("props", mergeVertexRequest.getValue1());
+ mergeVertexRequest = mergeVertexRequest.setAt1(props);
+ return mergeVertexRequest;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
new file mode 100644
index 0000000..9bfc049
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
@@ -0,0 +1,92 @@
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.neo4j.Neo4jConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.neo4j.driver.v1.AuthToken;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Neo4jBoltClient {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(Neo4jBoltClient.class);
+
+ private Driver client;
+
+ public Neo4jConfiguration config;
+
+ private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) {
+ this.config = neo4jConfiguration;
+ try {
+ this.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.client = null;
+ }
+ }
+
+ private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>();
+
+ public static Neo4jBoltClient getInstance(Neo4jConfiguration neo4jConfiguration) {
+ if ( INSTANCE_MAP != null &&
+ INSTANCE_MAP.size() > 0 &&
+ INSTANCE_MAP.containsKey(neo4jConfiguration)) {
+ return INSTANCE_MAP.get(neo4jConfiguration);
+ } else {
+ Neo4jBoltClient instance = new Neo4jBoltClient(neo4jConfiguration);
+ if( instance != null && instance.client != null ) {
+ INSTANCE_MAP.put(neo4jConfiguration, instance);
+ return instance;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public void start() throws Exception {
+
+ Objects.nonNull(config);
+ assertThat("config.getScheme().startsWith(\"tcp\")", config.getScheme().startsWith("tcp"));
+
+ LOGGER.info("Neo4jConfiguration.start {}", config);
+
+ AuthToken authToken = null;
+ if( StringUtils.isNotBlank(config.getUsername()) && StringUtils.isNotBlank(config.getPassword())) {
+ authToken = AuthTokens.basic( config.getUsername(), config.getPassword() );
+ }
+
+ if( authToken == null ) {
+ client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort());
+ } else {
+ client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort(), authToken);
+ }
+
+ Objects.nonNull(client);
+
+ }
+
+ public void stop() throws Exception {
+ this.client.session().close();
+ this.client = null;
+ }
+
+ public Neo4jConfiguration config() {
+ return config;
+ }
+
+ public Driver client() {
+ return client;
+ }
+}