You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/05/01 00:03:10 UTC
incubator-streams git commit: resolves STREAMS-313 #313 neo4j runtime
dependencies are scope:provided and optional using the existing Http-based
writer does not require neo4j libraries using the Binary writer does,
but it's up to the implementing stream
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-313 [created] cb1aaea3e
resolves STREAMS-313 #313
neo4j runtime dependencies are scope:provided and optional
using the existing Http-based writer does not require neo4j libraries
using the Binary writer does, but it's up to the implementing stream to import it
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cb1aaea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cb1aaea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cb1aaea3
Branch: refs/heads/STREAMS-313
Commit: cb1aaea3e127cf4b9b72dd110883765450c74976
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Apr 30 17:03:05 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Apr 30 17:03:05 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-persist-graph/pom.xml | 97 +++++++++
.../streams/graph/GraphHttpPersistWriter.java | 205 ++++++++++++++++++
.../streams/graph/GraphPersistWriter.java | 216 -------------------
.../apache/streams/graph/GraphVertexReader.java | 4 +-
.../apache/streams/graph/HttpGraphHelper.java | 36 ++++
.../apache/streams/graph/QueryGraphHelper.java | 45 ++++
.../streams/graph/neo4j/BinaryGraphHelper.java | 109 ++++++++++
.../graph/neo4j/CypherQueryGraphHelper.java | 183 ++++++++++++++++
.../neo4j/Neo4jBinaryGraphPersistWriter.java | 200 +++++++++++++++++
.../graph/neo4j/Neo4jBinaryGraphUtil.java | 95 ++++++++
.../graph/neo4j/Neo4jHttpGraphHelper.java | 67 ++++++
.../streams/graph/GraphBinaryConfiguration.json | 28 +++
.../graph/GraphEdgeWriterConfiguration.json | 33 ---
.../streams/graph/GraphHttpConfiguration.json | 22 ++
.../streams/graph/GraphReaderConfiguration.json | 2 +-
.../graph/GraphVertexWriterConfiguration.json | 33 ---
.../streams/graph/GraphWriterConfiguration.json | 21 --
.../graph/test/Neo4jHttpPersistWriterIT.java | 102 +++++++++
.../test/TestNeo4jBinaryPersistWriter.java | 162 ++++++++++++++
.../graph/test/TestNeo4jVertexReader.java | 4 +-
20 files changed, 1356 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml
index a7b5c1d..edbae7e 100644
--- a/streams-contrib/streams-persist-graph/pom.xml
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -50,6 +50,12 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
</dependency>
@@ -63,6 +69,48 @@
<artifactId>stringtemplate</artifactId>
<version>4.0.2</version>
</dependency>
+ <dependency>
+ <groupId>org.javatuples</groupId>
+ <artifactId>javatuples</artifactId>
+ <version>1.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.neo4j</groupId>
+ <artifactId>neo4j-kernel</artifactId>
+ <version>2.2.1</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j</groupId>
+ <artifactId>neo4j-lucene-index</artifactId>
+ <version>2.2.1</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j</groupId>
+ <artifactId>neo4j-kernel</artifactId>
+ <version>2.2.1</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j</groupId>
+ <artifactId>neo4j-io</artifactId>
+ <version>2.2.1</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
@@ -118,6 +166,55 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-pojo</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- revisit using this if streams bumps to jdk8 -->
+ <!--<plugin>-->
+ <!--<groupId>com.github.harti2006</groupId>-->
+ <!--<artifactId>neo4j-server-maven-plugin</artifactId>-->
+ <!--<version>0.1</version>-->
+ <!--<configuration>-->
+ <!--<port>7474</port>-->
+ <!--<version>2.2.1</version>-->
+ <!--</configuration>-->
+ <!--<executions>-->
+ <!--<execution>-->
+ <!--<id>start-neo4j-server</id>-->
+ <!--<phase>-->
+ <!--pre-integration-test-->
+ <!--</phase>-->
+ <!--<goals>-->
+ <!--<goal>start</goal>-->
+ <!--</goals>-->
+ <!--</execution>-->
+ <!--<execution>-->
+ <!--<id>stop-neo4j-server</id>-->
+ <!--<phase>-->
+ <!--post-integration-test-->
+ <!--</phase>-->
+ <!--<goals>-->
+ <!--<goal>stop</goal>-->
+ <!--</goals>-->
+ <!--</execution>-->
+ <!--</executions>-->
+ <!--</plugin>-->
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
new file mode 100644
index 0000000..71e1648
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
+import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database with
+ * an http rest endpoint (such as neo4j)
+ */
+public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
+
+ public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName();
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class);
+ private final static long MAX_WRITE_LATENCY = 1000;
+
+ protected GraphHttpConfiguration configuration;
+
+ protected QueryGraphHelper queryGraphHelper;
+ protected HttpGraphHelper httpGraphHelper;
+
+ private static ObjectMapper mapper;
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public GraphHttpPersistWriter() {
+ this(new ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+ }
+
+ public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
+ super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
+ if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+ super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+ }
+ else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
+ super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
+ }
+ this.configuration = configuration;
+ }
+
+ @Override
+ protected ObjectNode preparePayload(StreamsDatum entry) {
+
+ Activity activity = null;
+
+ if (entry.getDocument() instanceof Activity) {
+ activity = (Activity) entry.getDocument();
+ } else if (entry.getDocument() instanceof ObjectNode) {
+ activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ } else if (entry.getDocument() instanceof String) {
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Throwable e) {
+ LOGGER.warn(e.getMessage());
+ }
+ }
+
+ Preconditions.checkNotNull(activity);
+
+ ObjectNode request = mapper.createObjectNode();
+ ArrayNode statements = mapper.createArrayNode();
+
+ activity.getActor().setObjectType("page");
+
+ // always add vertices first
+
+ List<String> labels = Lists.newArrayList();
+ if( activity.getProvider() != null &&
+ !Strings.isNullOrEmpty(activity.getProvider().getId()) ) {
+ labels.add(activity.getProvider().getId());
+ }
+
+ if( activity.getActor() != null &&
+ !Strings.isNullOrEmpty(activity.getActor().getId()) ) {
+ if( activity.getActor().getObjectType() != null )
+ labels.add(activity.getActor().getObjectType());
+ statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activity.getActor())));
+ }
+
+ if( activity.getObject() != null &&
+ !Strings.isNullOrEmpty(activity.getObject().getId()) ) {
+ if( activity.getObject().getObjectType() != null )
+ labels.add(activity.getObject().getObjectType());
+ statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activity.getObject())));
+ }
+
+ // then add edge
+
+ if( !Strings.isNullOrEmpty(activity.getVerb()) ) {
+ statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
+ }
+
+
+
+ request.put("statements", statements);
+ return request;
+
+ }
+
+ @Override
+ protected ObjectNode executePost(HttpPost httpPost) {
+
+ Preconditions.checkNotNull(httpPost);
+
+ ObjectNode result = null;
+
+ CloseableHttpResponse response = null;
+
+ String entityString = null;
+ try {
+ response = httpclient.execute(httpPost);
+ HttpEntity entity = response.getEntity();
+ if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
+ entityString = EntityUtils.toString(entity);
+ result = mapper.readValue(entityString, ObjectNode.class);
+ }
+ LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
+ if( result == null ||
+ (
+ result.get("errors") != null &&
+ result.get("errors").isArray() &&
+ result.get("errors").iterator().hasNext()
+ )
+ ) {
+ LOGGER.error("Write Error: " + result.get("errors"));
+ } else {
+ LOGGER.info("Write Success");
+ }
+ } catch (IOException e) {
+ LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
+ } finally {
+ try {
+ if( response != null) response.close();
+ } catch (IOException e) {}
+ }
+ return result;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ super.prepare(configurationObject);
+ mapper = StreamsJacksonMapper.getInstance();
+
+ if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+ queryGraphHelper = new CypherQueryGraphHelper();
+ httpGraphHelper = new Neo4jHttpGraphHelper();
+ }
+
+ Preconditions.checkNotNull(queryGraphHelper);
+ Preconditions.checkNotNull(httpGraphHelper);
+ }
+
+ @Override
+ public void cleanUp() {
+
+ LOGGER.info("exiting");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
deleted file mode 100644
index 828319f..0000000
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Adds activityobjects as vertices and activities as edges to a graph database with
- * an http rest endpoint (such as neo4j)
- */
-public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
-
- public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName();
-
- private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class);
- private final static long MAX_WRITE_LATENCY = 1000;
-
- protected GraphWriterConfiguration configuration;
-
- protected GraphHelper graphHelper;
-
- private static ObjectMapper mapper;
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- public GraphPersistWriter() {
- this(new ComponentConfigurator<GraphWriterConfiguration>(GraphWriterConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
- }
-
- public GraphPersistWriter(GraphWriterConfiguration configuration) {
- super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
- if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
- super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
- }
- else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER)) {
- super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
- }
- this.configuration = configuration;
- }
-
- @Override
- protected ObjectNode preparePayload(StreamsDatum entry) {
-
- Activity activity = null;
-
- if (entry.getDocument() instanceof Activity) {
- activity = (Activity) entry.getDocument();
- } else if (entry.getDocument() instanceof ObjectNode) {
- activity = mapper.convertValue(entry.getDocument(), Activity.class);
- } else if (entry.getDocument() instanceof String) {
- try {
- activity = mapper.readValue((String) entry.getDocument(), Activity.class);
- } catch (Throwable e) {
- LOGGER.warn(e.getMessage());
- }
- }
-
- Preconditions.checkNotNull(activity);
-
- ObjectNode request = mapper.createObjectNode();
- ArrayNode statements = mapper.createArrayNode();
-
- activity.getActor().setObjectType("page");
-
- // always add vertices first
- // what types of verbs are relevant for adding vertices?
- if( configuration.getVertices().getVerbs().contains(activity.getVerb())) {
-
- // what objects and objectTypes are relevant for adding vertices?
- if( configuration.getVertices().getObjects().contains("actor") &&
- configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) {
- statements.add(graphHelper.mergeVertexRequest(activity.getActor()));
- }
- if( configuration.getVertices().getObjects().contains("object") &&
- configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) {
- statements.add(graphHelper.mergeVertexRequest(activity.getObject()));
- }
- if( configuration.getVertices().getObjects().contains("provider") &&
- configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) {
- statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
- }
- if( configuration.getVertices().getObjects().contains("target") &&
- configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) {
- statements.add(graphHelper.mergeVertexRequest(activity.getProvider()));
- }
-
- }
-
- // what types of verbs are relevant for adding edges?
- if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
-
- // what objects and objectTypes are relevant for adding edges?
- if( configuration.getEdges().getObjects().contains("actor") &&
- configuration.getEdges().getObjects().contains("object") &&
- configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
- configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) {
- statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getObject()));
- }
- if( configuration.getEdges().getObjects().contains("actor") &&
- configuration.getEdges().getObjects().contains("target") &&
- configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) &&
- configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) {
- statements.add(graphHelper.createEdgeRequest(activity, activity.getActor(), activity.getTarget()));
- }
- if( configuration.getEdges().getObjects().contains("provider") &&
- configuration.getEdges().getObjects().contains("actor") &&
- configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) &&
- configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) {
- statements.add(graphHelper.createEdgeRequest(activity, activity.getProvider(), activity.getActor()));
- }
- }
-
- request.put("statements", statements);
- return request;
-
- }
-
- @Override
- protected ObjectNode executePost(HttpPost httpPost) {
-
- Preconditions.checkNotNull(httpPost);
-
- ObjectNode result = null;
-
- CloseableHttpResponse response = null;
-
- String entityString = null;
- try {
- response = httpclient.execute(httpPost);
- HttpEntity entity = response.getEntity();
- if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) {
- entityString = EntityUtils.toString(entity);
- result = mapper.readValue(entityString, ObjectNode.class);
- }
- LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString);
- if( result == null ||
- (
- result.get("errors") != null &&
- result.get("errors").isArray() &&
- result.get("errors").iterator().hasNext()
- )
- ) {
- LOGGER.error("Write Error: " + result.get("errors"));
- } else {
- LOGGER.info("Write Success");
- }
- } catch (IOException e) {
- LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
- } catch (Exception e) {
- LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage());
- } finally {
- try {
- if( response != null) response.close();
- } catch (IOException e) {}
- }
- return result;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- super.prepare(configurationObject);
- mapper = StreamsJacksonMapper.getInstance();
-
- if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) {
- graphHelper = new CypherGraphHelper();
- }
-
- Preconditions.checkNotNull(graphHelper);
- }
-
- @Override
- public void cleanUp() {
-
- LOGGER.info("exiting");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
index 5910136..93041d8 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
@@ -58,9 +58,9 @@ public class GraphVertexReader extends SimpleHttpProvider implements StreamsPers
public GraphVertexReader(GraphReaderConfiguration configuration) {
super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
- if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
+ if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J))
super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
- else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER))
+ else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER))
super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
this.configuration = configuration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
new file mode 100644
index 0000000..0833ba0
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+
+import java.util.Map;
+
+/**
+ * Interface for methods allowing persistance to a graph database wrapped with
+ * a rest API. CypherGraphHelper is a good example, for neo4j.
+ */
+public interface HttpGraphHelper {
+
+ public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
new file mode 100644
index 0000000..eeacdae
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+
+import java.util.Map;
+
+/**
+ * Interface for methods allowing persistance to a graph database which uses a combination
+ * DSL
+ */
+public interface QueryGraphHelper {
+
+ public Pair<String, Map<String, Object>> getVertexRequest(String streamsId);
+
+ public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId);
+
+ public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject);
+
+ public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject);
+
+ public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
new file mode 100644
index 0000000..3dc8ffc
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class BinaryGraphHelper {
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+ Preconditions.checkNotNull(activityObject.getObjectType());
+
+ ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(props.get("id"), props);
+
+ return queryPlusParameters;
+ }
+
+ public Quartet<String, String, String, Map<String, Object>> createEdgeRequest(Activity activity) {
+
+ ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ Quartet createEdgeRequest = new Quartet(
+ activity.getActor().getId(),
+ activity.getObject().getId(),
+ activity.getId(),
+ props);
+
+ return createEdgeRequest;
+ }
+
+ public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ public static String getPropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ List<String> parts = Lists.newArrayList();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
+ }
+ }
+ builder.append(Joiner.on(",").join(parts));
+ builder.append("}");
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
new file mode 100644
index 0000000..472a762
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherQueryGraphHelper implements QueryGraphHelper {
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public final static String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v";
+ public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
+
+ public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
+ "CREATE UNIQUE (n:<type> { props }) "+
+ "RETURN n";
+
+ public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
+ "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
+ "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
+ "RETURN v";
+
+ public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
+ "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+ "RETURN r";
+
+ public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
+
+ ST getVertex = new ST(getVertexStringIdStatementTemplate);
+ getVertex.add("id", streamsId);
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+ return queryPlusParameters;
+ }
+
+ @Override
+ public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+
+ ST getVertex = new ST(getVertexLongIdStatementTemplate);
+ getVertex.add("id", vertexId);
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null);
+
+ return queryPlusParameters;
+
+ }
+
+ public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) {
+
+ Preconditions.checkNotNull(activityObject.getObjectType());
+
+ ST createVertex = new ST(createVertexStatementTemplate);
+ createVertex.add("id", activityObject.getId());
+ createVertex.add("type", activityObject.getObjectType());
+
+ ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
+
+ return queryPlusParameters;
+ }
+
+ public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) {
+
+ Preconditions.checkNotNull(activityObject.getObjectType());
+
+ Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+
+ ST mergeVertex = new ST(mergeVertexStatementTemplate);
+ mergeVertex.add("id", activityObject.getId());
+ mergeVertex.add("type", activityObject.getObjectType());
+ queryPlusParameters.setAt0(mergeVertex.render());
+
+ ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+ queryPlusParameters.setAt1(props);
+
+ return queryPlusParameters;
+ }
+
+ public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) {
+
+ Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+
+ ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
+ Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+ ST mergeEdge = new ST(createEdgeStatementTemplate);
+ mergeEdge.add("s_id", activity.getActor().getId());
+ mergeEdge.add("s_type", activity.getActor().getObjectType());
+ mergeEdge.add("d_id", activity.getObject().getId());
+ mergeEdge.add("d_type", activity.getObject().getObjectType());
+ mergeEdge.add("r_id", activity.getId());
+ mergeEdge.add("r_type", activity.getVerb());
+ mergeEdge.add("r_props", getPropertyCreater(props));
+
+ // set the activityObject's and extensions null, because their properties don't need to appear on the relationship
+ activity.setActor(null);
+ activity.setObject(null);
+ activity.setTarget(null);
+ activity.getAdditionalProperties().put("extensions", null);
+
+ String statement = mergeEdge.render();
+ queryPlusParameters.setAt0(statement);
+ queryPlusParameters.setAt1(props);
+
+ return queryPlusParameters;
+ }
+
+ public static String getPropertyValueSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ public static String getPropertyParamSetter(Map<String, Object> map, String symbol) {
+ StringBuilder builder = new StringBuilder();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String)(entry.getValue());
+ builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + propVal + "'");
+ }
+ }
+ return builder.toString();
+ }
+
+ public static String getPropertyCreater(Map<String, Object> map) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ List<String> parts = Lists.newArrayList();
+ for( Map.Entry<String, Object> entry : map.entrySet()) {
+ if( entry.getValue() instanceof String ) {
+ String propVal = (String) (entry.getValue());
+ parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
+ }
+ }
+ builder.append(Joiner.on(",").join(parts));
+ builder.append("}");
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
new file mode 100644
index 0000000..8ca5fff
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphPersistWriter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.graph.GraphBinaryConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Label;
+import org.neo4j.graphdb.factory.GraphDatabaseFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Adds activityobjects as vertices and activities as edges to a graph database file which will be
+ * loaded inside of neo4j
+ */
+public class Neo4jBinaryGraphPersistWriter implements StreamsPersistWriter {
+
+ public static final String STREAMS_ID = Neo4jBinaryGraphPersistWriter.class.getCanonicalName();
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jBinaryGraphPersistWriter.class);
+ private final static long MAX_WRITE_LATENCY = 1000;
+
+ protected GraphBinaryConfiguration configuration;
+
+ private static ObjectMapper mapper;
+
+ public GraphDatabaseService graph;
+ private Neo4jBinaryGraphUtil graphutil;
+ private CypherQueryGraphHelper queryGraphHelper;
+ private BinaryGraphHelper binaryGraphHelper;
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public Neo4jBinaryGraphPersistWriter() {
+ this(new ComponentConfigurator<GraphBinaryConfiguration>(GraphBinaryConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+ }
+
+ public Neo4jBinaryGraphPersistWriter(GraphBinaryConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ mapper = StreamsJacksonMapper.getInstance();
+
+ boolean newGraph = true;
+ if( FileUtils.getFile(configuration.getFile()).canRead())
+ newGraph = false;
+
+ graph = new GraphDatabaseFactory().newEmbeddedDatabase(configuration.getFile());
+
+ graphutil = new Neo4jBinaryGraphUtil();
+
+ queryGraphHelper = new CypherQueryGraphHelper();
+
+ binaryGraphHelper = new BinaryGraphHelper();
+
+ String globalLabel = "streams";
+
+ if( newGraph ) {
+ graphutil.addUniqueIndex(graph, globalLabel, "id", false);
+ for( String field: configuration.getIndexFields()) {
+ graphutil.addUniqueIndex(graph, globalLabel, field, false);
+ }
+ }
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ LOGGER.info("exiting");
+
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ Activity activity = null;
+ ActivityObject activityObject = null;
+
+ if (entry.getDocument() instanceof Activity) {
+ activity = (Activity) entry.getDocument();
+ } if (entry.getDocument() instanceof ActivityObject) {
+ activityObject = (ActivityObject) entry.getDocument();
+ } else if (entry.getDocument() instanceof ObjectNode) {
+ try {
+ activity = mapper.convertValue(entry.getDocument(), Activity.class);
+ } catch( Exception e ) {
+ activityObject = mapper.convertValue(entry.getDocument(), ActivityObject.class);
+ }
+ } else if (entry.getDocument() instanceof String) {
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Throwable e1) {
+ try {
+ activityObject = mapper.readValue((String) entry.getDocument(), ActivityObject.class);
+ } catch( Exception e2 ) {
+ LOGGER.error("Can't handle input: ", e2);
+ }
+ }
+ }
+
+ Preconditions.checkArgument(activity != null || activityObject != null);
+
+ List<String> labels = Lists.newArrayList("streams");
+
+ if( activityObject != null ) {
+ if (activityObject.getObjectType() != null)
+ labels.add(activityObject.getObjectType());
+ Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activityObject);
+ graphutil.addNode(
+ graph,
+ labels,
+ addNode);
+ } else if( activity != null ) {
+
+ // always add vertices first
+
+ if (activity.getProvider() != null &&
+ !Strings.isNullOrEmpty(activity.getProvider().getId())) {
+ labels.add(activity.getProvider().getId());
+ }
+ if (activity.getActor() != null &&
+ !Strings.isNullOrEmpty(activity.getActor().getId())) {
+ if (activity.getActor().getObjectType() != null)
+ labels.add(activity.getActor().getObjectType());
+ Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activity.getActor());
+ graphutil.addNode(
+ graph,
+ labels,
+ addNode);
+ }
+
+ if (activity.getObject() != null &&
+ !Strings.isNullOrEmpty(activity.getObject().getId())) {
+ if (activity.getObject().getObjectType() != null)
+ labels.add(activity.getObject().getObjectType());
+ Pair<String, Map<String, Object>> addNode = binaryGraphHelper.createVertexRequest(activity.getObject());
+ graphutil.addNode(
+ graph,
+ labels,
+ addNode);
+ }
+
+ // then add edge
+
+ if (!Strings.isNullOrEmpty(activity.getVerb())) {
+ if (activity.getVerb() != null)
+ labels.add(activity.getVerb());
+ Quartet<String, String, String, Map<String, Object>> addRelationship = binaryGraphHelper.createEdgeRequest(activity);
+ graphutil.addRelationship(
+ graph,
+ labels,
+ addRelationship);
+ }
+
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
new file mode 100644
index 0000000..086b443
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jBinaryGraphUtil.java
@@ -0,0 +1,95 @@
+package org.apache.streams.graph.neo4j;
+
+import com.google.common.collect.Lists;
+import org.javatuples.Pair;
+import org.javatuples.Quartet;
+import org.neo4j.graphdb.Direction;
+import org.neo4j.graphdb.DynamicLabel;
+import org.neo4j.graphdb.DynamicRelationshipType;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Label;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Result;
+import org.neo4j.graphdb.Transaction;
+import org.neo4j.graphdb.schema.Schema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class Neo4jBinaryGraphUtil {
+
+ CypherQueryGraphHelper queryGraphHelper = new CypherQueryGraphHelper();
+
+ protected void addUniqueIndex(GraphDatabaseService graph, String label, String property, Boolean wait ) {
+ Transaction tx = graph.beginTx();
+ try {
+ Schema schema = graph.schema();
+ schema
+ .constraintFor(DynamicLabel.label(label))
+ .assertPropertyIsUnique(property)
+ .create();
+ if (wait)
+ schema.awaitIndexesOnline(2L, TimeUnit.HOURS);
+ tx.success();
+ tx.close();
+ } finally {
+ tx.failure();
+ tx.close();
+ }
+ }
+
+ protected void addIndex(GraphDatabaseService graph, String label, String property, Boolean wait) {
+ Transaction tx = graph.beginTx();
+ try {
+ Schema schema = graph.schema();
+ schema
+ .indexFor(DynamicLabel.label(label))
+ .on(property)
+ .create();
+ if (wait)
+ schema.awaitIndexesOnline(2L, TimeUnit.HOURS);
+ tx.success();
+ tx.close();
+ } finally {
+ tx.failure();
+ tx.close();
+ }
+ }
+
+
+ protected void addNode(GraphDatabaseService graph, List<String> labelStringList, Pair<String, Map<String, Object>> nodeIdPlusProperties) {
+ Transaction tx = graph.beginTx();
+ List<Label> labelList = Lists.newArrayList();
+ for( String labelString : labelStringList ) {
+ labelList.add(DynamicLabel.label(labelString));
+ }
+ try {
+ Node node = graph.createNode(labelList.toArray(new Label[0]));
+ node.setProperty("id", nodeIdPlusProperties.getValue0());
+ for( Map.Entry<String, Object> property : nodeIdPlusProperties.getValue1().entrySet()) {
+ node.setProperty(property.getKey(), property.getValue());
+ }
+ tx.success();
+ } catch(Exception e) {
+ tx.failure();
+ } finally {
+ tx.close();
+ }
+ }
+
+ protected void addRelationship(GraphDatabaseService graph, List<String> labelStringList, Quartet<String, String, String, Map<String, Object>> relationshipIdsPlusProperties) {
+ Transaction tx = graph.beginTx();
+ try {
+ Node source = graph.findNodesByLabelAndProperty(DynamicLabel.label(labelStringList.get(0)), "id", relationshipIdsPlusProperties.getValue0()).iterator().next();
+ Node destination = graph.findNodesByLabelAndProperty(DynamicLabel.label(labelStringList.get(0)), "id", relationshipIdsPlusProperties.getValue1()).iterator().next();
+ source.createRelationshipTo(destination, DynamicRelationshipType.withName(labelStringList.get(labelStringList.size() - 1)));
+ tx.success();
+ } catch(Exception e) {
+ tx.failure();
+ } finally {
+ tx.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
new file mode 100644
index 0000000..a076cff
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.neo4j;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.javatuples.Pair;
+import org.stringtemplate.v4.ST;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class Neo4jHttpGraphHelper implements HttpGraphHelper {
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public final static String statementKey = "statement";
+ public final static String paramsKey = "parameters";
+ public final static String propsKey = "props";
+
+ public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) {
+
+ Preconditions.checkNotNull(queryPlusParameters);
+ Preconditions.checkNotNull(queryPlusParameters.getValue0());
+ Preconditions.checkNotNull(queryPlusParameters.getValue1());
+
+ ObjectNode request = mapper.createObjectNode();
+
+ request.put(statementKey, queryPlusParameters.getValue0());
+
+ ObjectNode params = mapper.createObjectNode();
+ ObjectNode props = mapper.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
+
+ params.put(propsKey, props);
+ request.put(paramsKey, params);
+
+ return request;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
new file mode 100644
index 0000000..04a70e1
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
@@ -0,0 +1,28 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "type": {
+ "type": "string",
+ "description": "Graph DB type",
+ "enum" : ["neo4j", "gremlin"]
+ },
+ "file": {
+ "type": "string",
+ "description": "New Graph DB File"
+ },
+ "indexFields": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "Fields to index under streams label"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
deleted file mode 100644
index f14f52c..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "objects": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- },
- "verbs": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- },
- "objectTypes": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
new file mode 100644
index 0000000..c63e0fb
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
@@ -0,0 +1,22 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType" : "org.apache.streams.graph.GraphHttpConfiguration",
+ "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "type": {
+ "type": "string",
+ "description": "Graph DB type",
+ "enum" : ["neo4j", "rexster"]
+ },
+ "graph": {
+ "type": "string",
+ "description": "Graph DB Graph ID"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
index ca588f3..6cffba0 100644
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
+++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphReaderConfiguration.json
@@ -6,7 +6,7 @@
],
"id": "#",
"javaType" : "org.apache.streams.graph.GraphReaderConfiguration",
- "extends" : {"$ref":"GraphConfiguration.json"},
+ "extends" : {"$ref":"GraphHttpConfiguration.json"},
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"query": {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
deleted file mode 100644
index 66fc236..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "objects": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- },
- "verbs": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- },
- "objectTypes": {
- "type": "array",
- "required": false,
- "items": {
- "type": "string"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
deleted file mode 100644
index 0c7e304..0000000
--- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "javaType" : "org.apache.streams.graph.GraphWriterConfiguration",
- "extends" : {"$ref":"GraphConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "vertices": {
- "type": "object",
- "javaType": "org.apache.streams.graph.GraphVertexWriterConfiguration"
- },
- "edges": {
- "type": "object",
- "javaType": "org.apache.streams.graph.GraphEdgeWriterConfiguration"
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
new file mode 100644
index 0000000..d5105ec
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/Neo4jHttpPersistWriterIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.GraphHttpConfiguration;
+import org.apache.streams.graph.GraphHttpPersistWriter;
+import org.apache.streams.graph.GraphVertexReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.test.TestGraphDatabaseFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Unit test for
+ * @see {@link GraphVertexReader}
+ *
+ * Test that graph db http writes to neo4j rest API
+ *
+ *
+ */
+@Ignore("Need to find a way to launch neo4j during verify step to use this")
+public class Neo4jHttpPersistWriterIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpPersistWriterIT.class);
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private GraphHttpConfiguration testConfiguration;
+
+ private GraphHttpPersistWriter graphPersistWriter;
+
+ @Before
+ public void prepareTest() throws IOException {
+
+ testConfiguration = new GraphHttpConfiguration();
+ testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
+ testConfiguration.setGraph("data");
+ testConfiguration.setHostname("localhost");
+ testConfiguration.setPort(7474l);
+ testConfiguration.setContentType("application/json");
+ testConfiguration.setProtocol("http");
+
+ graphPersistWriter = new GraphHttpPersistWriter(testConfiguration);
+
+ graphPersistWriter.prepare(testConfiguration);
+ }
+
+ @Test
+ public void testNeo4jHttpPersistWriter() throws IOException {
+
+ InputStream testActivityFolderStream = Neo4jHttpPersistWriterIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = Neo4jHttpPersistWriterIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = mapper.readValue(testActivityFileStream, Activity.class);
+ activity.getActor().setId(activity.getActor().getObjectType());
+ activity.getObject().setId(activity.getObject().getObjectType());
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ graphPersistWriter.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ }
+
+ graphPersistWriter.cleanUp();
+
+ // hit neo with http and check vertex/edge counts
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
new file mode 100644
index 0000000..8295734
--- /dev/null
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jBinaryPersistWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.graph.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.graph.GraphBinaryConfiguration;
+import org.apache.streams.graph.GraphReaderConfiguration;
+import org.apache.streams.graph.GraphVertexReader;
+import org.apache.streams.graph.neo4j.Neo4jBinaryGraphPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.graphdb.Direction;
+import org.neo4j.graphdb.DynamicLabel;
+import org.neo4j.graphdb.DynamicRelationshipType;
+import org.neo4j.graphdb.GraphDatabaseService;
+import org.neo4j.graphdb.Node;
+import org.neo4j.graphdb.Relationship;
+import org.neo4j.graphdb.RelationshipType;
+import org.neo4j.graphdb.factory.GraphDatabaseFactory;
+import org.reflections.Reflections;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Unit test for
+ * @see {@link GraphVertexReader}
+ *
+ * Test that graph db responses can be converted to streams data
+ */
+public class TestNeo4jBinaryPersistWriter {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TestNeo4jBinaryPersistWriter.class);
+
+ private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private final String testGraphFile = "target/graph.neo4j";
+
+ private GraphBinaryConfiguration testConfiguration;
+
+ private Neo4jBinaryGraphPersistWriter graphPersistWriter;
+
+ @Before
+ public void prepareTest() throws IOException {
+
+ testConfiguration = new GraphBinaryConfiguration();
+ testConfiguration.setType(GraphBinaryConfiguration.Type.NEO_4_J);
+ testConfiguration.setFile(testGraphFile);
+
+ graphPersistWriter = new Neo4jBinaryGraphPersistWriter(testConfiguration);
+
+ graphPersistWriter.prepare(testConfiguration);
+
+ assert(graphPersistWriter.graph.isAvailable(5000));
+
+ }
+
+ @Test
+ public void testNeo4jBinaryPersistWriter() throws Exception {
+
+ InputStream testActivityFolderStream = TestNeo4jBinaryPersistWriter.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+ for( String file : files) {
+ LOGGER.info("File: " + file);
+ InputStream testActivityFileStream = TestNeo4jBinaryPersistWriter.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = mapper.readValue(testActivityFileStream, Activity.class);
+ activity.getActor().setId(activity.getActor().getObjectType());
+ activity.getObject().setId(activity.getObject().getObjectType());
+ if( !Strings.isNullOrEmpty((String)activity.getObject().getAdditionalProperties().get("verb"))) {
+ activity.getObject().setObjectType((String) activity.getObject().getAdditionalProperties().get("verb"));
+ activity.getObject().setId(activity.getObject().getObjectType());
+ }
+ if( !Strings.isNullOrEmpty(activity.getActor().getId())) {
+ StreamsDatum actorDatum = new StreamsDatum(activity.getActor(), activity.getActor().getId());
+ graphPersistWriter.write( actorDatum );
+ }
+ if( !Strings.isNullOrEmpty(activity.getObject().getId())) {
+ StreamsDatum objectDatum = new StreamsDatum(activity.getObject(), activity.getObject().getId());
+ graphPersistWriter.write( objectDatum );
+ }
+ if( !Strings.isNullOrEmpty(activity.getVerb()) &&
+ !Strings.isNullOrEmpty(activity.getActor().getId()) &&
+ !Strings.isNullOrEmpty(activity.getObject().getId())) {
+ StreamsDatum activityDatum = new StreamsDatum(activity, activity.getVerb());
+ graphPersistWriter.write( activityDatum );
+ }
+ LOGGER.info("Wrote: " + activity.getVerb());
+ }
+
+ graphPersistWriter.cleanUp();
+
+ graphPersistWriter.graph.beginTx();
+ Node organization = graphPersistWriter.graph.findNodes(DynamicLabel.label("streams"), "id", "organization").next();
+ Node person = graphPersistWriter.graph.findNodes(DynamicLabel.label("streams"), "id", "person").next();
+ Assert.assertNotNull(organization);
+ Assert.assertTrue(organization.hasLabel(DynamicLabel.label("streams")));
+ Assert.assertTrue(organization.hasLabel(DynamicLabel.label("organization")));
+ Assert.assertNotNull(person);
+ Assert.assertTrue(person.hasLabel(DynamicLabel.label("streams")));
+ Assert.assertTrue(person.hasLabel(DynamicLabel.label("person")));
+ Assert.assertTrue(person.hasRelationship());
+ Assert.assertTrue(person.hasRelationship(Direction.OUTGOING));
+ Assert.assertTrue(person.hasRelationship(DynamicRelationshipType.withName("join"), Direction.OUTGOING));
+ Assert.assertTrue(person.hasRelationship(DynamicRelationshipType.withName("leave"), Direction.OUTGOING));
+// Iterable < Relationship > relationships = person.getRelationships(Direction.OUTGOING);
+// List<Relationship> relationshipList = Lists.newArrayList(relationships);
+// Assert.assertEquals(relationshipList.size(), 2);
+ Relationship joinRelationship = person.getSingleRelationship(DynamicRelationshipType.withName("join"), Direction.OUTGOING);
+ Assert.assertNotNull(joinRelationship);
+ Node joinRelationshipStart = joinRelationship.getStartNode();
+ Node joinRelationshipEnd = joinRelationship.getEndNode();
+ Assert.assertEquals(joinRelationshipStart, person);
+ Assert.assertEquals(joinRelationshipEnd, organization);
+ Relationship leaveRelationship = person.getSingleRelationship(DynamicRelationshipType.withName("leave"), Direction.OUTGOING);
+ Assert.assertNotNull(leaveRelationship);
+ Node leaveRelationshipStart = leaveRelationship.getStartNode();
+ Node leaveRelationshipEnd = leaveRelationship.getEndNode();
+ Assert.assertEquals(leaveRelationshipStart, person);
+ Assert.assertEquals(leaveRelationshipEnd, organization);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb1aaea3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
index ec2b1e5..8d76e90 100644
--- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
+++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jVertexReader.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
-import org.apache.streams.graph.GraphConfiguration;
+import org.apache.streams.graph.GraphHttpConfiguration;
import org.apache.streams.graph.GraphReaderConfiguration;
import org.apache.streams.graph.GraphVertexReader;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -57,7 +57,7 @@ public class TestNeo4jVertexReader {
public void prepareTest() throws IOException {
testConfiguration = new GraphReaderConfiguration();
- testConfiguration.setType(GraphConfiguration.Type.NEO_4_J);
+ testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
graphPersistReader = new GraphVertexReader(testConfiguration);
InputStream testActivityFileStream = TestNeo4jVertexReader.class.getClassLoader()