You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by steveblackmon <gi...@git.apache.org> on 2017/01/04 16:19:02 UTC

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

GitHub user steveblackmon opened a pull request:

    https://github.com/apache/incubator-streams/pull/348

    STREAMS-344: streams-persist-neo4j

    now that neo4j-java-driver is apache licensed, we can support binding to the binary bolt endpoint to write and read streams data.
    
    everything related to neo4j is now in streams-persist-neo4j, leaving only interfaces in streams-persist-graph for now.
    
    unlike most other persisters, this module expects to find a valid activity or activityobject in the datum, not just any document, but then it is able to insert activityobjects as vertices and activities as edges connecting activityobjects.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/steveblackmon/incubator-streams STREAMS-344

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-streams/pull/348.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #348
    
----
commit 1f4e175cf84a208252d488c2858ea420af0642f9
Author: Steve Blackmon <sb...@apache.org>
Date:   2017-01-03T00:11:01Z

    new neo4j module with bolt:// and http:// support, and tests

commit ee700fd16e8631bdb0fb453d686beef4167af13b
Author: Steve Blackmon <sb...@apache.org>
Date:   2017-01-03T01:42:33Z

    add constructor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95082081
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.streams.neo4j;
    +
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
    +import org.apache.streams.pojo.json.Activity;
    +import org.apache.streams.pojo.json.ActivityObject;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.javatuples.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 1/2/17.
    + */
    +public class Neo4jPersistUtil {
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
    +
    +  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    +
    +  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
    +
    +  public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
    +
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
    +
    +    String id = entry.getId();
    +    Activity activity = null;
    +    ActivityObject activityObject = null;
    +    Object document = entry.getDocument();
    +
    +    if (document instanceof Activity) {
    +      activity = (Activity) document;
    +    } else if (document instanceof ActivityObject) {
    +      activityObject = (ActivityObject) document;
    +    } else {
    +      ObjectNode objectNode;
    +      if (document instanceof ObjectNode) {
    +        objectNode = (ObjectNode) document;
    +      } else if ( document instanceof String) {
    +        try {
    +          objectNode = mapper.readValue((String) document, ObjectNode.class);
    +        } catch (IOException ex) {
    +          LOGGER.error("Can't handle input: ", entry);
    +          throw ex;
    +        }
    +      } else {
    +        LOGGER.error("Can't handle input: ", entry);
    +        throw new Exception("Can't create statements from datum.");
    +      }
    +
    +      if ( objectNode.get("verb") != null ) {
    +        try {
    +          activity = mapper.convertValue(objectNode, Activity.class);
    +          activityObject = activity.getObject();
    +        } catch (Exception ex) {
    +          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +        }
    +      } else {
    +        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +      }
    +
    +    }
    +
    +    Preconditions.checkArgument(activity != null ^ activityObject != null);
    +
    +    if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
    +
    +      statements.add(vertexStatement(activityObject));
    +
    +    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
    +
    +      statements.addAll(vertexStatements(activity));
    +
    +      statements.addAll(edgeStatements(activity));
    +
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
    +      Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
    +      statements.add(actorStatement);
    +    }
    +
    +    if (object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
    +      statements.add(objectStatement);
    +    }
    +
    +    if (target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
    +      statements.add(targetStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorObjectEdgeStatement.getValue1());
    +      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
    +      statements.add(actorObjectEdgeStatement);
    +    }
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorTargetEdgeStatement.getValue1());
    +      actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
    +      statements.add(actorTargetEdgeStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static Pair<String, Map<String, Object>> vertexStatement(ActivityObject activityObject) {
    +    Pair<String, Map<String, Object>> mergeVertexRequest = helper.mergeVertexRequest(activityObject);
    +    HashMap<String, Object> props = new HashMap<>();
    --- End diff --
    
    use Map<> on LHS?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams issue #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on the issue:

    https://github.com/apache/incubator-streams/pull/348
  
    +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95082300
  
    --- Diff: streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java ---
    @@ -98,22 +102,24 @@ public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperat
         while (iter.hasNext()) {
           Map.Entry<String, JsonNode> item = iter.next();
           String fullKey = item.getKey();
    -      if ( !fullKey.contains(Character.valueOf(seperator).toString())) {
    +      if( !fullKey.contains(Character.valueOf(seperator).toString())) {
             root.put(item.getKey(), item.getValue());
           } else {
             ObjectNode currentNode = root;
    -        List<String> keyParts = new ArrayList<>();
    +        List<String> keyParts = Lists.newArrayList();
    --- End diff --
    
    any reason for switching back to using Guava here, don't see the benefit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95082073
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.streams.neo4j;
    +
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
    +import org.apache.streams.pojo.json.Activity;
    +import org.apache.streams.pojo.json.ActivityObject;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.javatuples.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 1/2/17.
    + */
    +public class Neo4jPersistUtil {
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
    +
    +  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    +
    +  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
    +
    +  public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
    +
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
    +
    +    String id = entry.getId();
    +    Activity activity = null;
    +    ActivityObject activityObject = null;
    +    Object document = entry.getDocument();
    +
    +    if (document instanceof Activity) {
    +      activity = (Activity) document;
    +    } else if (document instanceof ActivityObject) {
    +      activityObject = (ActivityObject) document;
    +    } else {
    +      ObjectNode objectNode;
    +      if (document instanceof ObjectNode) {
    +        objectNode = (ObjectNode) document;
    +      } else if ( document instanceof String) {
    +        try {
    +          objectNode = mapper.readValue((String) document, ObjectNode.class);
    +        } catch (IOException ex) {
    +          LOGGER.error("Can't handle input: ", entry);
    +          throw ex;
    +        }
    +      } else {
    +        LOGGER.error("Can't handle input: ", entry);
    +        throw new Exception("Can't create statements from datum.");
    +      }
    +
    +      if ( objectNode.get("verb") != null ) {
    +        try {
    +          activity = mapper.convertValue(objectNode, Activity.class);
    +          activityObject = activity.getObject();
    +        } catch (Exception ex) {
    +          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +        }
    +      } else {
    +        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +      }
    +
    +    }
    +
    +    Preconditions.checkArgument(activity != null ^ activityObject != null);
    +
    +    if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
    +
    +      statements.add(vertexStatement(activityObject));
    +
    +    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
    +
    +      statements.addAll(vertexStatements(activity));
    +
    +      statements.addAll(edgeStatements(activity));
    +
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
    +      Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
    +      statements.add(actorStatement);
    +    }
    +
    +    if (object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
    +      statements.add(objectStatement);
    +    }
    +
    +    if (target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
    +      statements.add(targetStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorObjectEdgeStatement.getValue1());
    +      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
    +      statements.add(actorObjectEdgeStatement);
    +    }
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    --- End diff --
    
    use Map<> on LHS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085437
  
    --- Diff: streams-util/src/main/java/org/apache/streams/util/PropertyUtil.java ---
    @@ -98,22 +102,24 @@ public static ObjectNode unflattenObjectNode(ObjectNode flatObject, char seperat
         while (iter.hasNext()) {
           Map.Entry<String, JsonNode> item = iter.next();
           String fullKey = item.getKey();
    -      if ( !fullKey.contains(Character.valueOf(seperator).toString())) {
    +      if( !fullKey.contains(Character.valueOf(seperator).toString())) {
             root.put(item.getKey(), item.getValue());
           } else {
             ObjectNode currentNode = root;
    -        List<String> keyParts = new ArrayList<>();
    +        List<String> keyParts = Lists.newArrayList();
    --- End diff --
    
    \u2714\ufe0f removed guava from this class



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95082088
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.streams.neo4j.bolt;
    +
    +import org.apache.streams.neo4j.Neo4jConfiguration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.neo4j.driver.v1.AuthToken;
    +import org.neo4j.driver.v1.AuthTokens;
    +import org.neo4j.driver.v1.Driver;
    +import org.neo4j.driver.v1.GraphDatabase;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class Neo4jBoltClient {
    +
    +    private static final Logger LOGGER = LoggerFactory
    +            .getLogger(Neo4jBoltClient.class);
    +
    +    private Driver client;
    +
    +    public Neo4jConfiguration config;
    +
    +    private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) {
    +        this.config = neo4jConfiguration;
    +        try {
    +            this.start();
    +        } catch (Exception e) {
    +            e.printStackTrace();
    +            this.client = null;
    +        }
    +    }
    +
    +    private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>();
    +
    +    public static Neo4jBoltClient getInstance(Neo4jConfiguration neo4jConfiguration) {
    +        if (    INSTANCE_MAP != null &&
    --- End diff --
    
    extra space after openning brace - style check nitpick


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085386
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.streams.neo4j;
    +
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
    +import org.apache.streams.pojo.json.Activity;
    +import org.apache.streams.pojo.json.ActivityObject;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.javatuples.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 1/2/17.
    + */
    +public class Neo4jPersistUtil {
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
    +
    +  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    +
    +  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
    +
    +  public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
    +
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
    +
    +    String id = entry.getId();
    +    Activity activity = null;
    +    ActivityObject activityObject = null;
    +    Object document = entry.getDocument();
    +
    +    if (document instanceof Activity) {
    +      activity = (Activity) document;
    +    } else if (document instanceof ActivityObject) {
    +      activityObject = (ActivityObject) document;
    +    } else {
    +      ObjectNode objectNode;
    +      if (document instanceof ObjectNode) {
    +        objectNode = (ObjectNode) document;
    +      } else if ( document instanceof String) {
    +        try {
    +          objectNode = mapper.readValue((String) document, ObjectNode.class);
    +        } catch (IOException ex) {
    +          LOGGER.error("Can't handle input: ", entry);
    +          throw ex;
    +        }
    +      } else {
    +        LOGGER.error("Can't handle input: ", entry);
    +        throw new Exception("Can't create statements from datum.");
    +      }
    +
    +      if ( objectNode.get("verb") != null ) {
    +        try {
    +          activity = mapper.convertValue(objectNode, Activity.class);
    +          activityObject = activity.getObject();
    +        } catch (Exception ex) {
    +          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +        }
    +      } else {
    +        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +      }
    +
    +    }
    +
    +    Preconditions.checkArgument(activity != null ^ activityObject != null);
    +
    +    if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
    +
    +      statements.add(vertexStatement(activityObject));
    +
    +    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
    +
    +      statements.addAll(vertexStatements(activity));
    +
    +      statements.addAll(edgeStatements(activity));
    +
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
    +      Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
    +      statements.add(actorStatement);
    +    }
    +
    +    if (object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
    +      statements.add(objectStatement);
    +    }
    +
    +    if (target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
    +      statements.add(targetStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorObjectEdgeStatement.getValue1());
    +      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
    +      statements.add(actorObjectEdgeStatement);
    +    }
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    --- End diff --
    
    \u2714\ufe0f 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085431
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.streams.neo4j.bolt;
    +
    +import org.apache.streams.neo4j.Neo4jConfiguration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.neo4j.driver.v1.AuthToken;
    +import org.neo4j.driver.v1.AuthTokens;
    +import org.neo4j.driver.v1.Driver;
    +import org.neo4j.driver.v1.GraphDatabase;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class Neo4jBoltClient {
    +
    +    private static final Logger LOGGER = LoggerFactory
    +            .getLogger(Neo4jBoltClient.class);
    +
    +    private Driver client;
    +
    +    public Neo4jConfiguration config;
    +
    +    private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) {
    +        this.config = neo4jConfiguration;
    +        try {
    +            this.start();
    +        } catch (Exception e) {
    +            e.printStackTrace();
    +            this.client = null;
    +        }
    +    }
    +
    +    private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>();
    +
    +    public static Neo4jBoltClient getInstance(Neo4jConfiguration neo4jConfiguration) {
    +        if (    INSTANCE_MAP != null &&
    --- End diff --
    
    \u2714\ufe0f 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by smarthi <gi...@git.apache.org>.
Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95082122
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java ---
    @@ -0,0 +1,84 @@
    +package org.apache.streams.neo4j.http;
    +
    +import org.apache.streams.neo4j.Neo4jConfiguration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import org.apache.http.client.HttpClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class Neo4jHttpClient {
    +
    +    private static final Logger LOGGER = LoggerFactory
    +            .getLogger(Neo4jHttpClient.class);
    +
    +    public Neo4jConfiguration config;
    +
    +    private HttpClient client;
    +
    +    private Neo4jHttpClient(Neo4jConfiguration neo4jConfiguration) {
    +        this.config = neo4jConfiguration;
    +        try {
    +            this.start();
    +        } catch (Exception e) {
    +            e.printStackTrace();
    +            this.client = null;
    +        }
    +    }
    +
    +    private static Map<Neo4jConfiguration, Neo4jHttpClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jHttpClient>();
    +
    +    public static Neo4jHttpClient getInstance(Neo4jConfiguration neo4jConfiguration) {
    +        if (    INSTANCE_MAP != null &&
    +                INSTANCE_MAP.size() > 0 &&
    +                INSTANCE_MAP.containsKey(neo4jConfiguration)
    +                )
    +            return INSTANCE_MAP.get(neo4jConfiguration);
    +        else {
    +            Neo4jHttpClient instance = new Neo4jHttpClient(neo4jConfiguration);
    +            if( instance != null && instance.client != null ) {
    +                INSTANCE_MAP.put(neo4jConfiguration, instance);
    +                return instance;
    +            } else {
    +                return null;
    +            }
    +        }
    +    }
    +
    +    public void start() throws Exception {
    +
    +        Preconditions.checkNotNull(config);
    +        Preconditions.checkArgument(
    +                config.getScheme().startsWith("http")
    +        );
    +
    +        LOGGER.info("Neo4jConfiguration.start {}", config);
    +
    +        Preconditions.checkNotNull(client);
    --- End diff --
    
    Replace by Objects.requireNotNull()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-streams/pull/348


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085389
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.streams.neo4j;
    +
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
    +import org.apache.streams.pojo.json.Activity;
    +import org.apache.streams.pojo.json.ActivityObject;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.javatuples.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 1/2/17.
    + */
    +public class Neo4jPersistUtil {
    +
    +  private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
    +
    +  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    +
    +  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
    +
    +  public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception {
    +
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
    +
    +    String id = entry.getId();
    +    Activity activity = null;
    +    ActivityObject activityObject = null;
    +    Object document = entry.getDocument();
    +
    +    if (document instanceof Activity) {
    +      activity = (Activity) document;
    +    } else if (document instanceof ActivityObject) {
    +      activityObject = (ActivityObject) document;
    +    } else {
    +      ObjectNode objectNode;
    +      if (document instanceof ObjectNode) {
    +        objectNode = (ObjectNode) document;
    +      } else if ( document instanceof String) {
    +        try {
    +          objectNode = mapper.readValue((String) document, ObjectNode.class);
    +        } catch (IOException ex) {
    +          LOGGER.error("Can't handle input: ", entry);
    +          throw ex;
    +        }
    +      } else {
    +        LOGGER.error("Can't handle input: ", entry);
    +        throw new Exception("Can't create statements from datum.");
    +      }
    +
    +      if ( objectNode.get("verb") != null ) {
    +        try {
    +          activity = mapper.convertValue(objectNode, Activity.class);
    +          activityObject = activity.getObject();
    +        } catch (Exception ex) {
    +          activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +        }
    +      } else {
    +        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
    +      }
    +
    +    }
    +
    +    Preconditions.checkArgument(activity != null ^ activityObject != null);
    +
    +    if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) {
    +
    +      statements.add(vertexStatement(activityObject));
    +
    +    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
    +
    +      statements.addAll(vertexStatements(activity));
    +
    +      statements.addAll(edgeStatements(activity));
    +
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
    +      Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor);
    +      statements.add(actorStatement);
    +    }
    +
    +    if (object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> objectStatement = vertexStatement(object);
    +      statements.add(objectStatement);
    +    }
    +
    +    if (target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> targetStatement = vertexStatement(target);
    +      statements.add(targetStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorObjectEdgeStatement.getValue1());
    +      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
    +      statements.add(actorObjectEdgeStatement);
    +    }
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorTargetEdgeStatement.getValue1());
    +      actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
    +      statements.add(actorTargetEdgeStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static Pair<String, Map<String, Object>> vertexStatement(ActivityObject activityObject) {
    +    Pair<String, Map<String, Object>> mergeVertexRequest = helper.mergeVertexRequest(activityObject);
    +    HashMap<String, Object> props = new HashMap<>();
    --- End diff --
    
    \u2714\ufe0f 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request #348: STREAMS-344: streams-persist-neo4j

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085434
  
    --- Diff: streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/http/Neo4jHttpClient.java ---
    @@ -0,0 +1,84 @@
    +package org.apache.streams.neo4j.http;
    +
    +import org.apache.streams.neo4j.Neo4jConfiguration;
    +
    +import com.google.common.base.Preconditions;
    +
    +import org.apache.http.client.HttpClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class Neo4jHttpClient {
    +
    +    private static final Logger LOGGER = LoggerFactory
    +            .getLogger(Neo4jHttpClient.class);
    +
    +    public Neo4jConfiguration config;
    +
    +    private HttpClient client;
    +
    +    private Neo4jHttpClient(Neo4jConfiguration neo4jConfiguration) {
    +        this.config = neo4jConfiguration;
    +        try {
    +            this.start();
    +        } catch (Exception e) {
    +            e.printStackTrace();
    +            this.client = null;
    +        }
    +    }
    +
    +    private static Map<Neo4jConfiguration, Neo4jHttpClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jHttpClient>();
    +
    +    public static Neo4jHttpClient getInstance(Neo4jConfiguration neo4jConfiguration) {
    +        if (    INSTANCE_MAP != null &&
    +                INSTANCE_MAP.size() > 0 &&
    +                INSTANCE_MAP.containsKey(neo4jConfiguration)
    +                )
    +            return INSTANCE_MAP.get(neo4jConfiguration);
    +        else {
    +            Neo4jHttpClient instance = new Neo4jHttpClient(neo4jConfiguration);
    +            if( instance != null && instance.client != null ) {
    +                INSTANCE_MAP.put(neo4jConfiguration, instance);
    +                return instance;
    +            } else {
    +                return null;
    +            }
    +        }
    +    }
    +
    +    public void start() throws Exception {
    +
    +        Preconditions.checkNotNull(config);
    +        Preconditions.checkArgument(
    +                config.getScheme().startsWith("http")
    +        );
    +
    +        LOGGER.info("Neo4jConfiguration.start {}", config);
    +
    +        Preconditions.checkNotNull(client);
    --- End diff --
    
    \u2714\ufe0f 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---