You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/20 03:21:14 UTC

[1/5] git commit: Resolves https://issues.apache.org/jira/browse/STREAMS-70

Repository: incubator-streams
Updated Branches:
  refs/heads/master aa376fdf4 -> 45dee67d1


Resolves https://issues.apache.org/jira/browse/STREAMS-70


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f2383db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f2383db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f2383db8

Branch: refs/heads/master
Commit: f2383db8f4cfddff1145a93025a7c9051aeaa3af
Parents: e6ffe29
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 5 13:31:49 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 5 13:31:49 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-processor-json/pom.xml  |  69 +++++++++
 .../apache/streams/json/JsonPathExtractor.java  | 119 +++++++++++++++
 .../org/apache/streams/json/JsonPathFilter.java | 149 +++++++++++++++++++
 .../json/test/JsonPathExtractorTest.java        |  88 +++++++++++
 .../src/test/resources/books.json               |  21 +++
 6 files changed, 447 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index d80fc63..d6873bb 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -46,6 +46,7 @@
         <module>streams-persist-mongo</module>
         <!--<module>streams-processor-lucene</module>-->
         <!--<module>streams-processor-tika</module>-->
+        <module>streams-processor-json</module>
         <module>streams-processor-urls</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/pom.xml b/streams-contrib/streams-processor-json/pom.xml
new file mode 100644
index 0000000..27695bc
--- /dev/null
+++ b/streams-contrib/streams-processor-json/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-json</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
new file mode 100644
index 0000000..eab1da1
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -0,0 +1,119 @@
+package org.apache.streams.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class JsonPathExtractor implements StreamsProcessor {
+
+    public JsonPathExtractor() {
+        System.out.println("creating JsonPathExtractor");
+    }
+
+    public JsonPathExtractor(String pathExpression) {
+        this.pathExpression = pathExpression;
+        System.out.println("creating JsonPathExtractor for " + this.pathExpression);
+    }
+
+    private final static String STREAMS_ID = "JsonPathExtractor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class);
+
+    private ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private String pathExpression;
+    private JsonPath jsonPath;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json = null;
+
+        LOGGER.debug("{} processing {}", STREAMS_ID);
+
+        if( entry.getDocument() instanceof ObjectNode ) {
+            ObjectNode node = (ObjectNode) entry.getDocument();
+            try {
+                json = mapper.writeValueAsString(node);
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+            }
+        } else if( entry.getDocument() instanceof String ) {
+            json = (String) entry.getDocument();
+        }
+
+        if( StringUtils.isNotEmpty(json)) {
+
+            try {
+                Object readResult = jsonPath.read(json);
+
+                if (readResult instanceof String) {
+                    String match = (String) readResult;
+                    StreamsDatum matchDatum = new StreamsDatum(match);
+                    result.add(matchDatum);
+                } else if (readResult instanceof JSONObject) {
+                    JSONObject match = (JSONObject) readResult;
+                    ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+                    StreamsDatum matchDatum = new StreamsDatum(objectNode);
+                    result.add(matchDatum);
+                } else if (readResult instanceof JSONArray) {
+                    JSONArray array = (JSONArray) readResult;
+                    Iterator iterator = array.iterator();
+                    while (iterator.hasNext()) {
+                        Object item = iterator.next();
+                        if( item instanceof String ) {
+                            String match = (String) item;
+                            StreamsDatum matchDatum = new StreamsDatum(match);
+                            result.add(matchDatum);
+                        } else if ( item instanceof JSONObject ) {
+                            StreamsDatum matchDatum = new StreamsDatum(item);
+                            result.add(matchDatum);
+                        }
+                    }
+                } else {
+
+                }
+
+            } catch( Exception e ) {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+            }
+
+        }
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        if( configurationObject instanceof String )
+            jsonPath = JsonPath.compile((String)(configurationObject));
+
+        mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
new file mode 100644
index 0000000..929b95b
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -0,0 +1,149 @@
+package org.apache.streams.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class JsonPathFilter implements StreamsProcessor {
+
+    public JsonPathFilter() {
+        System.out.println("creating JsonPathFilter");
+    }
+
+    private final static String STREAMS_ID = "JsonPathFilter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class);
+
+    private ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private String pathExpression;
+    private JsonPath jsonPath;
+    private String destNodeName;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json = null;
+
+        ObjectNode document = null;
+
+        LOGGER.debug("{} processing {}", STREAMS_ID);
+
+        if( entry.getDocument() instanceof ObjectNode ) {
+            document = (ObjectNode) entry.getDocument();
+            try {
+                json = mapper.writeValueAsString(document);
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+            }
+        } else if( entry.getDocument() instanceof String ) {
+            json = (String) entry.getDocument();
+            try {
+                document = mapper.readValue(json, ObjectNode.class);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        }
+
+        Preconditions.checkNotNull(document);
+
+        if( StringUtils.isNotEmpty(json)) {
+
+            Object srcResult = null;
+            try {
+                srcResult = jsonPath.read(json);
+
+            } catch( Exception e ) {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+            }
+
+            Preconditions.checkNotNull(srcResult);
+
+            String[] path = StringUtils.split(pathExpression, '.');
+            ObjectNode node = document;
+            for (int i = 1; i < path.length-1; i++) {
+                node = (ObjectNode) document.get(path[i]);
+            }
+
+            Preconditions.checkNotNull(node);
+
+            if( srcResult instanceof JSONArray ) {
+                try {
+                    ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class);
+                    if( jsonNode.size() == 1 ) {
+                        JsonNode item = jsonNode.get(0);
+                        node.set(destNodeName, item);
+                    } else {
+                        node.set(destNodeName, jsonNode);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOGGER.warn(e.getMessage());
+                }
+            } else if( srcResult instanceof JSONObject ) {
+                try {
+                    ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class);
+                    node.set(destNodeName, jsonNode);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOGGER.warn(e.getMessage());
+                }
+            } else if( srcResult instanceof String ) {
+                try {
+                    node.put(destNodeName, (String) srcResult);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    LOGGER.warn(e.getMessage());
+                }
+            }
+
+        }
+
+        result.add(new StreamsDatum(document));
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        if( configurationObject instanceof Map) {
+            Map<String,String> params = ( Map<String,String>) configurationObject;
+            pathExpression = params.get("pathExpression");
+            jsonPath = JsonPath.compile(pathExpression);
+            destNodeName = pathExpression.substring(pathExpression.lastIndexOf(".") + 1);
+        }
+
+        mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
new file mode 100644
index 0000000..2484939
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -0,0 +1,88 @@
+package org.apache.streams.json.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.json.JsonPathExtractor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonPathExtractorTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
+
+    private String testJson;
+
+    @Before
+    public void initialize() {
+        try {
+            testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
+        } catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void test1()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[*].author");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(2));
+        assertTrue(result.get(0).getDocument() instanceof String);
+        assertTrue(result.get(1).getDocument() instanceof String);
+    }
+
+    @Ignore
+    @Test
+    public void test2()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.category == 'reference')]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+    @Ignore
+    @Test
+    public void test3()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.price > 10)]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+    @Ignore
+    @Test
+    public void test4()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.isbn)]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/test/resources/books.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/resources/books.json b/streams-contrib/streams-processor-json/src/test/resources/books.json
new file mode 100644
index 0000000..30cf954
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/resources/books.json
@@ -0,0 +1,21 @@
+{
+    "store": {
+        "book": [
+            { "category": "reference",
+                "author": "Nigel Rees",
+                "title": "Sayings of the Century",
+                "price": 8.95
+            },
+            { "category": "fiction",
+                "author": "Evelyn Waugh",
+                "title": "Sword of Honour",
+                "price": 12.99,
+                "isbn": "0-553-21311-3"
+            }
+        ],
+        "bicycle": {
+            "color": "red",
+            "price": 19.95
+        }
+    }
+}
\ No newline at end of file


[4/5] git commit: License headers

Posted by sb...@apache.org.
License headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d965ed98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d965ed98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d965ed98

Branch: refs/heads/master
Commit: d965ed987c7e556f752f58a53e37dbbc5a01ecfd
Parents: 65157bd
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 20:19:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 20:19:47 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/json/JsonPathFilter.java  |  4 ++--
 .../streams/json/test/JsonPathExtractorTest.java | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d965ed98/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index 820e459..b8c10ab 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -1,5 +1,3 @@
-package org.apache.streams.json;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,6 +17,8 @@ package org.apache.streams.json;
  * under the License.
  */
 
+package org.apache.streams.json;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d965ed98/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 7464ab1..2ab3b7f 100644
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.json.test;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;


[5/5] git commit: Merge branch 'STREAMS-70'

Posted by sb...@apache.org.
Merge branch 'STREAMS-70'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/45dee67d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/45dee67d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/45dee67d

Branch: refs/heads/master
Commit: 45dee67d12aeeb46639b4d8c40ca8a46989af1d6
Parents: aa376fd d965ed9
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 20:20:54 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 20:20:54 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   3 +-
 streams-contrib/streams-processor-json/pom.xml  |  86 ++++++++++
 .../apache/streams/json/JsonPathExtractor.java  | 140 +++++++++++++++
 .../org/apache/streams/json/JsonPathFilter.java | 171 +++++++++++++++++++
 .../json/test/JsonPathExtractorTest.java        | 101 +++++++++++
 .../src/test/resources/books.json               |  21 +++
 7 files changed, 522 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45dee67d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --cc streams-contrib/pom.xml
index 44e97c0,d6873bb..620f68e
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@@ -44,9 -44,9 +44,10 @@@
          <module>streams-persist-hdfs</module>
          <module>streams-persist-kafka</module>
          <module>streams-persist-mongo</module>
 +		<module>streams-amazon-aws</module>
          <!--<module>streams-processor-lucene</module>-->
          <!--<module>streams-processor-tika</module>-->
+         <module>streams-processor-json</module>
          <module>streams-processor-urls</module>
          <module>streams-provider-datasift</module>
          <module>streams-provider-facebook</module>


[2/5] git commit: resolves https://issues.apache.org/jira/browse/STREAMS-60

Posted by sb...@apache.org.
resolves https://issues.apache.org/jira/browse/STREAMS-60


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f24f2517
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f24f2517
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f24f2517

Branch: refs/heads/master
Commit: f24f2517672f57d1c118220be8eed7662b6136e5
Parents: f2383db
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue May 6 11:11:01 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue May 6 11:11:01 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f24f2517/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index b04350e..65bd6dc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -4,6 +4,7 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
                         if( !Strings.isNullOrEmpty(line) ) {
                             reader.countersCurrent.incrementAttempt();
                             String[] fields = line.split(Character.toString(reader.DELIMITER));
-                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
                             write( entry );
                             reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                         }


[3/5] git commit: updated per pull request feedback

Posted by sb...@apache.org.
updated per pull request feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/65157bd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/65157bd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/65157bd4

Branch: refs/heads/master
Commit: 65157bd4cd642e4fa0b8e2f7236f9dd6cfd73b84
Parents: f24f251
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 14:49:51 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 14:49:51 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-processor-json/pom.xml  | 17 +++++++
 .../apache/streams/json/JsonPathExtractor.java  | 49 ++++++++++++++------
 .../org/apache/streams/json/JsonPathFilter.java | 40 ++++++++++++----
 .../json/test/JsonPathExtractorTest.java        | 12 ++---
 4 files changed, 86 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/pom.xml b/streams-contrib/streams-processor-json/pom.xml
index 27695bc..ad22158 100644
--- a/streams-contrib/streams-processor-json/pom.xml
+++ b/streams-contrib/streams-processor-json/pom.xml
@@ -1,4 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index eab1da1..c4920d8 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -1,5 +1,24 @@
 package org.apache.streams.json;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -19,19 +38,11 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * Provides a base implementation for extracting json fields and
+ * objects from datums using JsonPath syntax
  */
 public class JsonPathExtractor implements StreamsProcessor {
 
-    public JsonPathExtractor() {
-        System.out.println("creating JsonPathExtractor");
-    }
-
-    public JsonPathExtractor(String pathExpression) {
-        this.pathExpression = pathExpression;
-        System.out.println("creating JsonPathExtractor for " + this.pathExpression);
-    }
-
     private final static String STREAMS_ID = "JsonPathExtractor";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class);
@@ -41,6 +52,15 @@ public class JsonPathExtractor implements StreamsProcessor {
     private String pathExpression;
     private JsonPath jsonPath;
 
+    public JsonPathExtractor() {
+        LOGGER.info("creating JsonPathExtractor");
+    }
+
+    public JsonPathExtractor(String pathExpression) {
+        this.pathExpression = pathExpression;
+        LOGGER.info("creating JsonPathExtractor for " + this.pathExpression);
+    }
+
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
 
@@ -55,7 +75,7 @@ public class JsonPathExtractor implements StreamsProcessor {
             try {
                 json = mapper.writeValueAsString(node);
             } catch (JsonProcessingException e) {
-                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
             }
         } else if( entry.getDocument() instanceof String ) {
             json = (String) entry.getDocument();
@@ -85,7 +105,9 @@ public class JsonPathExtractor implements StreamsProcessor {
                             StreamsDatum matchDatum = new StreamsDatum(match);
                             result.add(matchDatum);
                         } else if ( item instanceof JSONObject ) {
-                            StreamsDatum matchDatum = new StreamsDatum(item);
+                            JSONObject match = (JSONObject) item;
+                            ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+                            StreamsDatum matchDatum = new StreamsDatum(objectNode);
                             result.add(matchDatum);
                         }
                     }
@@ -94,7 +116,6 @@ public class JsonPathExtractor implements StreamsProcessor {
                 }
 
             } catch( Exception e ) {
-                e.printStackTrace();
                 LOGGER.warn(e.getMessage());
             }
 
@@ -114,6 +135,6 @@ public class JsonPathExtractor implements StreamsProcessor {
 
     @Override
     public void cleanUp() {
-
+        LOGGER.info("shutting down JsonPathExtractor for " + this.pathExpression);
     }
 };

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index 929b95b..820e459 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -1,5 +1,24 @@
 package org.apache.streams.json;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,14 +42,11 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * Provides a base implementation for filtering datums which
+ * do not contain specific fields using JsonPath syntax
  */
 public class JsonPathFilter implements StreamsProcessor {
 
-    public JsonPathFilter() {
-        System.out.println("creating JsonPathFilter");
-    }
-
     private final static String STREAMS_ID = "JsonPathFilter";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class);
@@ -41,6 +57,15 @@ public class JsonPathFilter implements StreamsProcessor {
     private JsonPath jsonPath;
     private String destNodeName;
 
+    public JsonPathFilter() {
+        LOGGER.info("creating JsonPathFilter");
+    }
+
+    public JsonPathFilter(String pathExpression) {
+        this.pathExpression = pathExpression;
+        LOGGER.info("creating JsonPathFilter for " + this.pathExpression);
+    }
+
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
 
@@ -102,7 +127,6 @@ public class JsonPathFilter implements StreamsProcessor {
                         node.set(destNodeName, jsonNode);
                     }
                 } catch (Exception e) {
-                    e.printStackTrace();
                     LOGGER.warn(e.getMessage());
                 }
             } else if( srcResult instanceof JSONObject ) {
@@ -110,14 +134,12 @@ public class JsonPathFilter implements StreamsProcessor {
                     ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class);
                     node.set(destNodeName, jsonNode);
                 } catch (Exception e) {
-                    e.printStackTrace();
                     LOGGER.warn(e.getMessage());
                 }
             } else if( srcResult instanceof String ) {
                 try {
                     node.put(destNodeName, (String) srcResult);
                 } catch (Exception e) {
-                    e.printStackTrace();
                     LOGGER.warn(e.getMessage());
                 }
             }
@@ -144,6 +166,6 @@ public class JsonPathFilter implements StreamsProcessor {
 
     @Override
     public void cleanUp() {
-
+        LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression);
     }
 };

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 2484939..7464ab1 100644
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -19,12 +19,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
+ * Test for extracting json fields and
+ * objects from datums using JsonPath syntax
+ */
 public class JsonPathExtractorTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
@@ -52,7 +49,6 @@ public class JsonPathExtractorTest {
         assertTrue(result.get(1).getDocument() instanceof String);
     }
 
-    @Ignore
     @Test
     public void test2()
     {
@@ -63,7 +59,6 @@ public class JsonPathExtractorTest {
         assertTrue(result.get(0).getDocument() instanceof ObjectNode);
     }
 
-    @Ignore
     @Test
     public void test3()
     {
@@ -74,7 +69,6 @@ public class JsonPathExtractorTest {
         assertTrue(result.get(0).getDocument() instanceof ObjectNode);
     }
 
-    @Ignore
     @Test
     public void test4()
     {