You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/20 03:21:14 UTC
[1/5] git commit: Resolves
https://issues.apache.org/jira/browse/STREAMS-70
Repository: incubator-streams
Updated Branches:
refs/heads/master aa376fdf4 -> 45dee67d1
Resolves https://issues.apache.org/jira/browse/STREAMS-70
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f2383db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f2383db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f2383db8
Branch: refs/heads/master
Commit: f2383db8f4cfddff1145a93025a7c9051aeaa3af
Parents: e6ffe29
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 5 13:31:49 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 5 13:31:49 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
streams-contrib/streams-processor-json/pom.xml | 69 +++++++++
.../apache/streams/json/JsonPathExtractor.java | 119 +++++++++++++++
.../org/apache/streams/json/JsonPathFilter.java | 149 +++++++++++++++++++
.../json/test/JsonPathExtractorTest.java | 88 +++++++++++
.../src/test/resources/books.json | 21 +++
6 files changed, 447 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index d80fc63..d6873bb 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -46,6 +46,7 @@
<module>streams-persist-mongo</module>
<!--<module>streams-processor-lucene</module>-->
<!--<module>streams-processor-tika</module>-->
+ <module>streams-processor-json</module>
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/pom.xml b/streams-contrib/streams-processor-json/pom.xml
new file mode 100644
index 0000000..27695bc
--- /dev/null
+++ b/streams-contrib/streams-processor-json/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>streams-processor-json</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path-assert</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
new file mode 100644
index 0000000..eab1da1
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -0,0 +1,119 @@
+package org.apache.streams.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class JsonPathExtractor implements StreamsProcessor {
+
+ public JsonPathExtractor() {
+ System.out.println("creating JsonPathExtractor");
+ }
+
+ public JsonPathExtractor(String pathExpression) {
+ this.pathExpression = pathExpression;
+ System.out.println("creating JsonPathExtractor for " + this.pathExpression);
+ }
+
+ private final static String STREAMS_ID = "JsonPathExtractor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class);
+
+ private ObjectMapper mapper = new StreamsJacksonMapper();
+
+ private String pathExpression;
+ private JsonPath jsonPath;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json = null;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID);
+
+ if( entry.getDocument() instanceof ObjectNode ) {
+ ObjectNode node = (ObjectNode) entry.getDocument();
+ try {
+ json = mapper.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ } else if( entry.getDocument() instanceof String ) {
+ json = (String) entry.getDocument();
+ }
+
+ if( StringUtils.isNotEmpty(json)) {
+
+ try {
+ Object readResult = jsonPath.read(json);
+
+ if (readResult instanceof String) {
+ String match = (String) readResult;
+ StreamsDatum matchDatum = new StreamsDatum(match);
+ result.add(matchDatum);
+ } else if (readResult instanceof JSONObject) {
+ JSONObject match = (JSONObject) readResult;
+ ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+ StreamsDatum matchDatum = new StreamsDatum(objectNode);
+ result.add(matchDatum);
+ } else if (readResult instanceof JSONArray) {
+ JSONArray array = (JSONArray) readResult;
+ Iterator iterator = array.iterator();
+ while (iterator.hasNext()) {
+ Object item = iterator.next();
+ if( item instanceof String ) {
+ String match = (String) item;
+ StreamsDatum matchDatum = new StreamsDatum(match);
+ result.add(matchDatum);
+ } else if ( item instanceof JSONObject ) {
+ StreamsDatum matchDatum = new StreamsDatum(item);
+ result.add(matchDatum);
+ }
+ }
+ } else {
+
+ }
+
+ } catch( Exception e ) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+
+ }
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ if( configurationObject instanceof String )
+ jsonPath = JsonPath.compile((String)(configurationObject));
+
+ mapper.registerModule(new JsonOrgModule());
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
new file mode 100644
index 0000000..929b95b
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -0,0 +1,149 @@
+package org.apache.streams.json;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class JsonPathFilter implements StreamsProcessor {
+
+ public JsonPathFilter() {
+ System.out.println("creating JsonPathFilter");
+ }
+
+ private final static String STREAMS_ID = "JsonPathFilter";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class);
+
+ private ObjectMapper mapper = new StreamsJacksonMapper();
+
+ private String pathExpression;
+ private JsonPath jsonPath;
+ private String destNodeName;
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json = null;
+
+ ObjectNode document = null;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID);
+
+ if( entry.getDocument() instanceof ObjectNode ) {
+ document = (ObjectNode) entry.getDocument();
+ try {
+ json = mapper.writeValueAsString(document);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ } else if( entry.getDocument() instanceof String ) {
+ json = (String) entry.getDocument();
+ try {
+ document = mapper.readValue(json, ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ Preconditions.checkNotNull(document);
+
+ if( StringUtils.isNotEmpty(json)) {
+
+ Object srcResult = null;
+ try {
+ srcResult = jsonPath.read(json);
+
+ } catch( Exception e ) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+
+ Preconditions.checkNotNull(srcResult);
+
+ String[] path = StringUtils.split(pathExpression, '.');
+ ObjectNode node = document;
+ for (int i = 1; i < path.length-1; i++) {
+ node = (ObjectNode) document.get(path[i]);
+ }
+
+ Preconditions.checkNotNull(node);
+
+ if( srcResult instanceof JSONArray ) {
+ try {
+ ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class);
+ if( jsonNode.size() == 1 ) {
+ JsonNode item = jsonNode.get(0);
+ node.set(destNodeName, item);
+ } else {
+ node.set(destNodeName, jsonNode);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+ } else if( srcResult instanceof JSONObject ) {
+ try {
+ ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class);
+ node.set(destNodeName, jsonNode);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+ } else if( srcResult instanceof String ) {
+ try {
+ node.put(destNodeName, (String) srcResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ }
+ }
+
+ }
+
+ result.add(new StreamsDatum(document));
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ if( configurationObject instanceof Map) {
+ Map<String,String> params = ( Map<String,String>) configurationObject;
+ pathExpression = params.get("pathExpression");
+ jsonPath = JsonPath.compile(pathExpression);
+ destNodeName = pathExpression.substring(pathExpression.lastIndexOf(".") + 1);
+ }
+
+ mapper.registerModule(new JsonOrgModule());
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
new file mode 100644
index 0000000..2484939
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -0,0 +1,88 @@
+package org.apache.streams.json.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.json.JsonPathExtractor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonPathExtractorTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
+
+ private String testJson;
+
+ @Before
+ public void initialize() {
+ try {
+ testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void test1()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[*].author");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(2));
+ assertTrue(result.get(0).getDocument() instanceof String);
+ assertTrue(result.get(1).getDocument() instanceof String);
+ }
+
+ @Ignore
+ @Test
+ public void test2()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.category == 'reference')]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+ @Ignore
+ @Test
+ public void test3()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.price > 10)]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+ @Ignore
+ @Test
+ public void test4()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.isbn)]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2383db8/streams-contrib/streams-processor-json/src/test/resources/books.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/resources/books.json b/streams-contrib/streams-processor-json/src/test/resources/books.json
new file mode 100644
index 0000000..30cf954
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/resources/books.json
@@ -0,0 +1,21 @@
+{
+ "store": {
+ "book": [
+ { "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ { "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99,
+ "isbn": "0-553-21311-3"
+ }
+ ],
+ "bicycle": {
+ "color": "red",
+ "price": 19.95
+ }
+ }
+}
\ No newline at end of file
[4/5] git commit: License headers
Posted by sb...@apache.org.
License headers
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d965ed98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d965ed98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d965ed98
Branch: refs/heads/master
Commit: d965ed987c7e556f752f58a53e37dbbc5a01ecfd
Parents: 65157bd
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 20:19:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 20:19:47 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/json/JsonPathFilter.java | 4 ++--
.../streams/json/test/JsonPathExtractorTest.java | 19 +++++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d965ed98/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index 820e459..b8c10ab 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -1,5 +1,3 @@
-package org.apache.streams.json;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +17,8 @@ package org.apache.streams.json;
* under the License.
*/
+package org.apache.streams.json;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d965ed98/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 7464ab1..2ab3b7f 100644
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.json.test;
import com.fasterxml.jackson.databind.node.ObjectNode;
[5/5] git commit: Merge branch 'STREAMS-70'
Posted by sb...@apache.org.
Merge branch 'STREAMS-70'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/45dee67d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/45dee67d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/45dee67d
Branch: refs/heads/master
Commit: 45dee67d12aeeb46639b4d8c40ca8a46989af1d6
Parents: aa376fd d965ed9
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 20:20:54 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 20:20:54 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../streams/hdfs/WebHdfsPersistReaderTask.java | 3 +-
streams-contrib/streams-processor-json/pom.xml | 86 ++++++++++
.../apache/streams/json/JsonPathExtractor.java | 140 +++++++++++++++
.../org/apache/streams/json/JsonPathFilter.java | 171 +++++++++++++++++++
.../json/test/JsonPathExtractorTest.java | 101 +++++++++++
.../src/test/resources/books.json | 21 +++
7 files changed, 522 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45dee67d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --cc streams-contrib/pom.xml
index 44e97c0,d6873bb..620f68e
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@@ -44,9 -44,9 +44,10 @@@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
+ <module>streams-amazon-aws</module>
<!--<module>streams-processor-lucene</module>-->
<!--<module>streams-processor-tika</module>-->
+ <module>streams-processor-json</module>
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
[2/5] git commit: resolves
https://issues.apache.org/jira/browse/STREAMS-60
Posted by sb...@apache.org.
resolves https://issues.apache.org/jira/browse/STREAMS-60
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f24f2517
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f24f2517
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f24f2517
Branch: refs/heads/master
Commit: f24f2517672f57d1c118220be8eed7662b6136e5
Parents: f2383db
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue May 6 11:11:01 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue May 6 11:11:01 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f24f2517/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index b04350e..65bd6dc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -4,6 +4,7 @@ import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
write( entry );
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
[3/5] git commit: updated per pull request feedback
Posted by sb...@apache.org.
updated per pull request feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/65157bd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/65157bd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/65157bd4
Branch: refs/heads/master
Commit: 65157bd4cd642e4fa0b8e2f7236f9dd6cfd73b84
Parents: f24f251
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon May 19 14:49:51 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon May 19 14:49:51 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-processor-json/pom.xml | 17 +++++++
.../apache/streams/json/JsonPathExtractor.java | 49 ++++++++++++++------
.../org/apache/streams/json/JsonPathFilter.java | 40 ++++++++++++----
.../json/test/JsonPathExtractorTest.java | 12 ++---
4 files changed, 86 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/pom.xml b/streams-contrib/streams-processor-json/pom.xml
index 27695bc..ad22158 100644
--- a/streams-contrib/streams-processor-json/pom.xml
+++ b/streams-contrib/streams-processor-json/pom.xml
@@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index eab1da1..c4920d8 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -1,5 +1,24 @@
package org.apache.streams.json;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -19,19 +38,11 @@ import java.util.Iterator;
import java.util.List;
/**
- * Created by sblackmon on 12/10/13.
+ * Provides a base implementation for extracting json fields and
+ * objects from datums using JsonPath syntax
*/
public class JsonPathExtractor implements StreamsProcessor {
- public JsonPathExtractor() {
- System.out.println("creating JsonPathExtractor");
- }
-
- public JsonPathExtractor(String pathExpression) {
- this.pathExpression = pathExpression;
- System.out.println("creating JsonPathExtractor for " + this.pathExpression);
- }
-
private final static String STREAMS_ID = "JsonPathExtractor";
private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractor.class);
@@ -41,6 +52,15 @@ public class JsonPathExtractor implements StreamsProcessor {
private String pathExpression;
private JsonPath jsonPath;
+ public JsonPathExtractor() {
+ LOGGER.info("creating JsonPathExtractor");
+ }
+
+ public JsonPathExtractor(String pathExpression) {
+ this.pathExpression = pathExpression;
+ LOGGER.info("creating JsonPathExtractor for " + this.pathExpression);
+ }
+
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
@@ -55,7 +75,7 @@ public class JsonPathExtractor implements StreamsProcessor {
try {
json = mapper.writeValueAsString(node);
} catch (JsonProcessingException e) {
- e.printStackTrace();
+ LOGGER.warn(e.getMessage());
}
} else if( entry.getDocument() instanceof String ) {
json = (String) entry.getDocument();
@@ -85,7 +105,9 @@ public class JsonPathExtractor implements StreamsProcessor {
StreamsDatum matchDatum = new StreamsDatum(match);
result.add(matchDatum);
} else if ( item instanceof JSONObject ) {
- StreamsDatum matchDatum = new StreamsDatum(item);
+ JSONObject match = (JSONObject) item;
+ ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+ StreamsDatum matchDatum = new StreamsDatum(objectNode);
result.add(matchDatum);
}
}
@@ -94,7 +116,6 @@ public class JsonPathExtractor implements StreamsProcessor {
}
} catch( Exception e ) {
- e.printStackTrace();
LOGGER.warn(e.getMessage());
}
@@ -114,6 +135,6 @@ public class JsonPathExtractor implements StreamsProcessor {
@Override
public void cleanUp() {
-
+ LOGGER.info("shutting down JsonPathExtractor for " + this.pathExpression);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index 929b95b..820e459 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -1,5 +1,24 @@
package org.apache.streams.json;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,14 +42,11 @@ import java.util.List;
import java.util.Map;
/**
- * Created by sblackmon on 12/10/13.
+ * Provides a base implementation for filtering datums which
+ * do not contain specific fields using JsonPath syntax
*/
public class JsonPathFilter implements StreamsProcessor {
- public JsonPathFilter() {
- System.out.println("creating JsonPathFilter");
- }
-
private final static String STREAMS_ID = "JsonPathFilter";
private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathFilter.class);
@@ -41,6 +57,15 @@ public class JsonPathFilter implements StreamsProcessor {
private JsonPath jsonPath;
private String destNodeName;
+ public JsonPathFilter() {
+ LOGGER.info("creating JsonPathFilter");
+ }
+
+ public JsonPathFilter(String pathExpression) {
+ this.pathExpression = pathExpression;
+ LOGGER.info("creating JsonPathFilter for " + this.pathExpression);
+ }
+
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
@@ -102,7 +127,6 @@ public class JsonPathFilter implements StreamsProcessor {
node.set(destNodeName, jsonNode);
}
} catch (Exception e) {
- e.printStackTrace();
LOGGER.warn(e.getMessage());
}
} else if( srcResult instanceof JSONObject ) {
@@ -110,14 +134,12 @@ public class JsonPathFilter implements StreamsProcessor {
ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class);
node.set(destNodeName, jsonNode);
} catch (Exception e) {
- e.printStackTrace();
LOGGER.warn(e.getMessage());
}
} else if( srcResult instanceof String ) {
try {
node.put(destNodeName, (String) srcResult);
} catch (Exception e) {
- e.printStackTrace();
LOGGER.warn(e.getMessage());
}
}
@@ -144,6 +166,6 @@ public class JsonPathFilter implements StreamsProcessor {
@Override
public void cleanUp() {
-
+ LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/65157bd4/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 2484939..7464ab1 100644
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -19,12 +19,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
+ * Test for extracting json fields and
+ * objects from datums using JsonPath syntax
+ */
public class JsonPathExtractorTest {
private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
@@ -52,7 +49,6 @@ public class JsonPathExtractorTest {
assertTrue(result.get(1).getDocument() instanceof String);
}
- @Ignore
@Test
public void test2()
{
@@ -63,7 +59,6 @@ public class JsonPathExtractorTest {
assertTrue(result.get(0).getDocument() instanceof ObjectNode);
}
- @Ignore
@Test
public void test3()
{
@@ -74,7 +69,6 @@ public class JsonPathExtractorTest {
assertTrue(result.get(0).getDocument() instanceof ObjectNode);
}
- @Ignore
@Test
public void test4()
{