You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/10/30 19:14:49 UTC

[1/2] incubator-streams git commit: resolves STREAMS-370

Repository: incubator-streams
Updated Branches:
  refs/heads/master 629f54076 -> 986867dc5


resolves STREAMS-370


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5a2b143a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5a2b143a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5a2b143a

Branch: refs/heads/master
Commit: 5a2b143a195c21093a351cac86675b6f4514669e
Parents: 1470b92
Author: Steve Blackmon <sb...@apache.org>
Authored: Wed Oct 14 14:25:31 2015 -0500
Committer: Steve Blackmon <sb...@apache.org>
Committed: Wed Oct 14 14:25:31 2015 -0500

----------------------------------------------------------------------
 .../persist/SimpleHTTPPostPersistWriter.java    | 25 ++++++++++-
 .../streams/graph/GraphHttpPersistWriter.java   | 13 +++---
 .../streams/graph/neo4j/CypherGraphHelper.java  | 25 +++++++++--
 .../graph/neo4j/CypherQueryGraphHelper.java     | 46 +++++++++++++++-----
 4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
index f9d2103..3df4747 100644
--- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
+++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -64,6 +66,8 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
 
     protected HttpPersistWriterConfiguration configuration;
 
+    protected String authHeader;
+
     public SimpleHTTPPostPersistWriter() {
         this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http")));
     }
@@ -129,7 +133,12 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
      */
     protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
 
-        return (ObjectNode) entry.getDocument();
+        if( entry.getDocument() != null ) {
+            if( entry.getDocument() instanceof ObjectNode )
+                return (ObjectNode) entry.getDocument();
+            else return mapper.convertValue(entry.getDocument(), ObjectNode.class);
+        }
+        else return null;
     }
 
     /**
@@ -139,6 +148,8 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
         HttpPost httppost = new HttpPost(uri);
         httppost.addHeader("content-type", this.configuration.getContentType());
         httppost.addHeader("accept-charset", "UTF-8");
+        if( !Strings.isNullOrEmpty(authHeader))
+            httppost.addHeader("Authorization", "Basic " + authHeader);
         try {
             String entity = mapper.writeValueAsString(payload);
             httppost.setEntity(new StringEntity(entity));
@@ -188,6 +199,18 @@ public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
                 .setPort(this.configuration.getPort().intValue())
                 .setPath(this.configuration.getResourcePath());
 
+        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
+            uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken());
+        if( !Strings.isNullOrEmpty(configuration.getUsername())
+                && !Strings.isNullOrEmpty(configuration.getPassword())) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(configuration.getUsername());
+            stringBuilder.append(":");
+            stringBuilder.append(configuration.getPassword());
+            String string = stringBuilder.toString();
+            authHeader = Base64.encodeBase64String(string.getBytes());
+        }
+
         httpclient = HttpClients.createDefault();
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
index 298c628..a26168c 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -32,7 +32,6 @@ import org.apache.streams.components.http.HttpPersistWriterConfiguration;
 import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.TypeConverterUtil;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
 import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
@@ -76,7 +75,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
     public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
         super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class));
         if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit");
+            super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/");
         }
         else if( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
             super.configuration.setResourcePath("/graphs/" + configuration.getGraph());
@@ -139,10 +138,11 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
             statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
         }
 
-        Actor actor = activity.getActor();
-        Provider provider = activity.getProvider();
-
         if( activity != null ) {
+
+            Actor actor = activity.getActor();
+            Provider provider = activity.getProvider();
+
             if( provider != null &&
                     !Strings.isNullOrEmpty(provider.getId()) ) {
                 labels.add(provider.getId());
@@ -168,7 +168,6 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
             }
         }
 
-
         request.put("statements", statements);
         return request;
 
@@ -218,7 +217,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
     @Override
     public void prepare(Object configurationObject) {
 
-        super.prepare(configurationObject);
+        super.prepare(configuration);
         mapper = StreamsJacksonMapper.getInstance();
 
         if( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
index d01f6d5..8028350 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
@@ -47,12 +47,13 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
     public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
 
     public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
-                                                                "CREATE UNIQUE (n:<type> { props }) "+
-                                                                "RETURN n";
+                                                                "CREATE UNIQUE (v:<type> { props }) "+
+                                                                "ON CREATE SET v <labels> "+
+                                                                "RETURN v";
 
     public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
-                                                               "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
-                                                               "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+                                                               "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
                                                                "RETURN v";
 
     public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
@@ -88,9 +89,17 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
 
         ObjectNode request = mapper.createObjectNode();
 
+        List<String> labels = Lists.newArrayList();
+        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+            List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+            for( String extraLabel : extraLabels )
+                labels.add(":"+extraLabel);
+        }
+
         ST createVertex = new ST(createVertexStatementTemplate);
         createVertex.add("id", activityObject.getId());
         createVertex.add("type", activityObject.getObjectType());
+        createVertex.add("labels", Joiner.on(' ').join(labels));
         request.put(statementKey, createVertex.render());
 
         ObjectNode params = mapper.createObjectNode();
@@ -108,9 +117,17 @@ public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper {
 
         ObjectNode request = mapper.createObjectNode();
 
+        List<String> labels = Lists.newArrayList();
+        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+            List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+            for( String extraLabel : extraLabels )
+                labels.add(":"+extraLabel);
+        }
+
         ST mergeVertex = new ST(mergeVertexStatementTemplate);
         mergeVertex.add("id", activityObject.getId());
         mergeVertex.add("type", activityObject.getObjectType());
+        mergeVertex.add("labels", Joiner.on(' ').join(labels));
 
         ObjectNode params = mapper.createObjectNode();
         ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5a2b143a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
index 99912c7..86ab72f 100644
--- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -50,17 +50,18 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
     public final static String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v";
 
     public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+
-                                                                "CREATE UNIQUE (n:<type> { props }) "+
-                                                                "RETURN n";
+            "CREATE UNIQUE (v:<type> { props }) "+
+            "ON CREATE SET v <labels> "+
+            "RETURN v";
 
     public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+
-                                                               "ON CREATE SET v:<type>, v = { props }, v.`@timestamp` = timestamp() "+
-                                                               "ON MATCH SET v = { props }, v.`@timestamp` = timestamp() "+
-                                                               "RETURN v";
+            "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+            "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+            "RETURN v";
 
     public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
-                                                            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
-                                                            "RETURN r";
+            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
+            "RETURN r";
 
     public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
 
@@ -92,16 +93,21 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
 
         Preconditions.checkNotNull(activityObject.getObjectType());
 
+        List<String> labels = getLabels(activityObject);
+
         ST createVertex = new ST(createVertexStatementTemplate);
         createVertex.add("id", activityObject.getId());
         createVertex.add("type", activityObject.getObjectType());
+        if( labels.size() > 0)
+            createVertex.add("labels", Joiner.on(' ').join(labels));
+        String query = createVertex.render();
 
         ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
         Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
 
         Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props);
 
-        LOGGER.debug("createVertexRequest", queryPlusParameters.toString());
+        LOGGER.debug("createVertexRequest: ({},{})", query, props);
 
         return queryPlusParameters;
     }
@@ -112,16 +118,22 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
 
         Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
 
+        List<String> labels = getLabels(activityObject);
+
         ST mergeVertex = new ST(mergeVertexStatementTemplate);
         mergeVertex.add("id", activityObject.getId());
         mergeVertex.add("type", activityObject.getObjectType());
-        queryPlusParameters = queryPlusParameters.setAt0(mergeVertex.render());
+        if( labels.size() > 0)
+            mergeVertex.add("labels", Joiner.on(' ').join(labels));
+        String query = mergeVertex.render();
 
         ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
         Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-        queryPlusParameters = queryPlusParameters.setAt1(props);
 
-        LOGGER.debug("mergeVertexRequest", queryPlusParameters.toString());
+        LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+        queryPlusParameters = queryPlusParameters.setAt0(query);
+        queryPlusParameters = queryPlusParameters.setAt1(props);
 
         return queryPlusParameters;
     }
@@ -152,7 +164,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
         queryPlusParameters = queryPlusParameters.setAt0(statement);
         queryPlusParameters = queryPlusParameters.setAt1(props);
 
-        LOGGER.debug("createEdgeRequest", queryPlusParameters);
+        LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
 
         return queryPlusParameters;
     }
@@ -194,4 +206,14 @@ public class CypherQueryGraphHelper implements QueryGraphHelper {
         return builder.toString();
     }
 
+    private List<String> getLabels(ActivityObject activityObject) {
+        List<String> labels = Lists.newArrayList(":streams");
+        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
+            List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels");
+            for( String extraLabel : extraLabels )
+                labels.add(":"+extraLabel);
+        }
+        return labels;
+    }
+
 }


[2/2] incubator-streams git commit: Merge branch 'STREAMS-370'

Posted by sb...@apache.org.
Merge branch 'STREAMS-370'

* STREAMS-370:
  resolves STREAMS-370


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/986867dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/986867dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/986867dc

Branch: refs/heads/master
Commit: 986867dc5d5f4ff91de16142017fd5d8a621bb19
Parents: 629f540 5a2b143
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Oct 30 12:16:52 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Oct 30 12:16:52 2015 -0500

----------------------------------------------------------------------
 .../persist/SimpleHTTPPostPersistWriter.java    | 25 ++++++++++-
 .../streams/graph/GraphHttpPersistWriter.java   | 13 +++---
 .../streams/graph/neo4j/CypherGraphHelper.java  | 25 +++++++++--
 .../graph/neo4j/CypherQueryGraphHelper.java     | 46 +++++++++++++++-----
 4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------