You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/10/30 19:14:49 UTC
[1/2] incubator-streams git commit: resolves STREAMS-370
Repository: incubator-streams
Updated Branches:
refs/heads/master 629f54076 -> 986867dc5
resolves STREAMS-370
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5a2b143a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5a2b143a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5a2b143a
Branch: refs/heads/master
Commit: 5a2b143a195c21093a351cac86675b6f4514669e
Parents: 1470b92
Author: Steve Blackmon <sb...@apache.org>
Authored: Wed Oct 14 14:25:31 2015 -0500
Committer: Steve Blackmon <sb...@apache.org>
Committed: Wed Oct 14 14:25:31 2015 -0500
----------------------------------------------------------------------
.../persist/SimpleHTTPPostPersistWriter.java | 25 ++++++++++-
.../streams/graph/GraphHttpPersistWriter.java | 13 +++---
.../streams/graph/neo4j/CypherGraphHelper.java | 25 +++++++++--
.../graph/neo4j/CypherQueryGraphHelper.java | 46 +++++++++++++++-----
4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
index f9d2103..3df4747 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -64,6 +66,8 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
protected HttpPersistWriterConfiguration configuration;
+ protected String authHeader;
+
public SimpleHTTPPostPersistWriter() {
this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http")));
}
@@ -129,7 +133,12 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
*/
protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
- return (ObjectNode) entry.getDocument();
+ if( entry.getDocument() != null ) {
+ if( entry.getDocument() instanceof ObjectNode )
+ return (ObjectNode) entry.getDocument();
+ else return mapper.convertValue(entry.getDocument(), ObjectNode.class);
+ }
+ else return null;
}
/**
@@ -139,6 +148,8 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
HttpPost httppost = new HttpPost(uri);
httppost.addHeader("content-type", this.configuration.getContentType());
httppost.addHeader("accept-charset", "UTF-8");
+ if( !Strings.isNullOrEmpty(authHeader))
+ httppost.addHeader("Authorization", "Basic " + authHeader);
try {
String entity = mapper.writeValueAsString(payload);
httppost.setEntity(new StringEntity(entity));
@@ -188,6 +199,18 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
.setPort(this.configuration.getPort().intValue())
.setPath(this.configuration.getResourcePath());
+ if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
+ uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+ if( !Strings.isNullOrEmpty(configuration.getUsername())
+ && !Strings.isNullOrEmpty(configuration.getPassword())) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(configuration.getUsername());
+ stringBuilder.append(":");
+ stringBuilder.append(configuration.getPassword());
+ String string = stringBuilder.toString();
+ authHeader = Base64.encodeBase64String(string.getBytes());
+ }
+
httpclient = HttpClients.createDefault();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
index 298c628..a26168c 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -32,7 +32,6 @@ import org.apache.streams.components.http.HttpPersistWriterConfiguration;
import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.TypeConverterUtil;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
@@ -76,7 +75,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
- super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+ super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/");
}
else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
@@ -139,10 +138,11 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
}
- Actor actor = activity.getActor();
- Provider provider = activity.getProvider();
-
if( activity != null ) {
+
+ Actor actor = activity.getActor();
+ Provider provider = activity.getProvider();
+
if( provider != null &&
!Strings.isNullOrEmpty(provider.getId()) ) {
labels.add(provider.getId());
@@ -168,7 +168,6 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
}
}
-
request.put("statements", statements);
return request;
@@ -218,7 +217,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
@Override
public void prepare(Object configurationObject) {
- super.prepare(configurationObject);
+ super.prepare(configuration);
mapper = StreamsJacksonMapper.getInstance();
if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
index d01f6d5..8028350 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
@@ -47,12 +47,13 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
- "CREATE UNIQUE (n:<type> { props }) "+
- "RETURN n";
+ "CREATE UNIQUE (v:<type> { props }) "+
+ "ON CREATE SET v <labels> "+
+ "RETURN v";
public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
- "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
- "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
+ "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+ "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
"RETURN v";
public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
@@ -88,9 +89,17 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
ObjectNode request = mapper.createObjectNode();
+ List<String> labels = Lists.newArrayList();
+ if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+ List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+ for( String extraLabel : extraLabels )
+ labels.add(":"+extraLabel);
+ }
+
ST createVertex = new ST(createVertexStatementTemplate);
createVertex.add("id", activityObject.getId());
createVertex.add("type", activityObject.getObjectType());
+ createVertex.add("labels", Joiner.on(' ').join(labels));
request.put(statementKey, createVertex.render());
ObjectNode params = mapper.createObjectNode();
@@ -108,9 +117,17 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
ObjectNode request = mapper.createObjectNode();
+ List<String> labels = Lists.newArrayList();
+ if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+ List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+ for( String extraLabel : extraLabels )
+ labels.add(":"+extraLabel);
+ }
+
ST mergeVertex = new ST(mergeVertexStatementTemplate);
mergeVertex.add("id", activityObject.getId());
mergeVertex.add("type", activityObject.getObjectType());
+ mergeVertex.add("labels", Joiner.on(' ').join(labels));
ObjectNode params = mapper.createObjectNode();
ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
index 99912c7..86ab72f 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -50,17 +50,18 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
- "CREATE UNIQUE (n:<type> { props }) "+
- "RETURN n";
+ "CREATE UNIQUE (v:<type> { props }) "+
+ "ON CREATE SET v <labels> "+
+ "RETURN v";
public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
- "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
- "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
- "RETURN v";
+ "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+ "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+ "RETURN v";
public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
- "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
- "RETURN r";
+ "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+ "RETURN r";
public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
@@ -92,16 +93,21 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
Preconditions.checkNotNull(activityObject.getObjectType());
+ List<String> labels = getLabels(activityObject);
+
ST createVertex = new ST(createVertexStatementTemplate);
createVertex.add("id", activityObject.getId());
createVertex.add("type", activityObject.getObjectType());
+ if( labels.size() > 0)
+ createVertex.add("labels", Joiner.on(' ').join(labels));
+ String query = createVertex.render();
ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
- LOGGER.debug("createVertexRequest", queryPlusParameters.toString());
+ LOGGER.debug("createVertexRequest: ({},{})", query, props);
return queryPlusParameters;
}
@@ -112,16 +118,22 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+ List<String> labels = getLabels(activityObject);
+
ST mergeVertex = new ST(mergeVertexStatementTemplate);
mergeVertex.add("id", activityObject.getId());
mergeVertex.add("type", activityObject.getObjectType());
- queryPlusParameters = queryPlusParameters.setAt0(mergeVertex.render());
+ if( labels.size() > 0)
+ mergeVertex.add("labels", Joiner.on(' ').join(labels));
+ String query = mergeVertex.render();
ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
- queryPlusParameters = queryPlusParameters.setAt1(props);
- LOGGER.debug("mergeVertexRequest", queryPlusParameters.toString());
+ LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+ queryPlusParameters = queryPlusParameters.setAt0(query);
+ queryPlusParameters = queryPlusParameters.setAt1(props);
return queryPlusParameters;
}
@@ -152,7 +164,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
queryPlusParameters = queryPlusParameters.setAt0(statement);
queryPlusParameters = queryPlusParameters.setAt1(props);
- LOGGER.debug("createEdgeRequest", queryPlusParameters);
+ LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
return queryPlusParameters;
}
@@ -194,4 +206,14 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
return builder.toString();
}
+ private List<String> getLabels(ActivityObject activityObject) {
+ List<String> labels = Lists.newArrayList(":streams");
+ if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+ List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+ for( String extraLabel : extraLabels )
+ labels.add(":"+extraLabel);
+ }
+ return labels;
+ }
+
}
[2/2] incubator-streams git commit: Merge branch 'STREAMS-370'
Posted by sb...@apache.org.
Merge branch 'STREAMS-370'
* STREAMS-370:
resolves STREAMS-370
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/986867dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/986867dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/986867dc
Branch: refs/heads/master
Commit: 986867dc5d5f4ff91de16142017fd5d8a621bb19
Parents: 629f540 5a2b143
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Oct 30 12:16:52 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Oct 30 12:16:52 2015 -0500
----------------------------------------------------------------------
.../persist/SimpleHTTPPostPersistWriter.java | 25 ++++++++++-
.../streams/graph/GraphHttpPersistWriter.java | 13 +++---
.../streams/graph/neo4j/CypherGraphHelper.java | 25 +++++++++--
.../graph/neo4j/CypherQueryGraphHelper.java | 46 +++++++++++++++-----
4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------