You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:02 UTC

[25/51] [abbrv] git commit: [streaming] JSONParseFlatMap added to examples

[streaming] JSONParseFlatMap added to examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ee7c4a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ee7c4a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ee7c4a83

Branch: refs/heads/master
Commit: ee7c4a8310ec7e3649a803edf42a532c580ea0b1
Parents: 799424d
Author: Eszes Dávid <es...@gmail.com>
Authored: Fri Aug 1 11:53:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:11 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |  6 ++
 .../flink-streaming-core/pom.xml                |  8 ++
 .../examples/function/JSONParseFlatMap.java     | 99 ++++++++++++++++++++
 3 files changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 8db610f..23e3fef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -43,6 +43,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_2.10</artifactId>
 			<version>0.8.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 04b9372..73b8158 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -35,11 +35,19 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
+
 		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 			<version>3.1</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
new file mode 100644
index 0000000..267d035
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.examples.function;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * Abstract class derived from {@link FlatMapFunction} to handle JSON files.
+ * @param <IN>
+ * Type of the input elements.
+ * @param <OUT>
+ * Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends
+		FlatMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	private static final Log LOG = LogFactory.getLog(DataStream.class);
+
+	/**
+	 * Get the value of a field in a JSON text.
+	 * @param jsonText
+	 * The JSON text in which the field is searched. 
+	 * @param field
+	 * The field which is searched in the JSON text.
+	 * In case of embedded records fields have to be referred separated by dots.
+	 * @return
+	 * The value of the given field if it exists. Otherwise function returns with null.
+	 */
+	public String getField(String jsonText, String field) {
+		JSONObject jo = null;
+		try {
+			jo = new JSONObject(jsonText);
+		} catch (JSONException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Input string is not proper", e);
+			}
+			return null;
+		}
+
+		try {
+			String[] fieldArray = field.split("[.]");
+			int length = fieldArray.length;
+
+			return findInnerField(jo, fieldArray, length).getString(
+					fieldArray[length - 1]);
+
+		} catch (JSONException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Field " + field + " not found");
+			}
+		}
+		return null;
+	}
+
+	/**
+	 * Find an embedded JSON code associated with the given key (fieldArray).
+	 * @param jo
+	 * JSONObject in which we search.
+	 * @param fieldArray
+	 * String array identifying the field.
+	 * @param length
+	 * Length of the array.
+	 * @return
+	 * the searched embedded JSONObject if it exists. 
+	 * @throws JSONException
+	 * if the key is not found.
+	 */
+	private JSONObject findInnerField(JSONObject jo, String[] fieldArray,
+			int length) throws JSONException {
+
+		for (int i = 0; i <= length - 2; i++) {
+			jo = jo.getJSONObject(fieldArray[i]);
+		}
+		return jo;
+	}
+}