You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:02 UTC
[25/51] [abbrv] git commit: [streaming] JSONParseFlatMap added to
examples
[streaming] JSONParseFlatMap added to examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ee7c4a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ee7c4a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ee7c4a83
Branch: refs/heads/master
Commit: ee7c4a8310ec7e3649a803edf42a532c580ea0b1
Parents: 799424d
Author: Eszes Dávid <es...@gmail.com>
Authored: Fri Aug 1 11:53:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:11 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 6 ++
.../flink-streaming-core/pom.xml | 8 ++
.../examples/function/JSONParseFlatMap.java | 99 ++++++++++++++++++++
3 files changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 8db610f..23e3fef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -43,6 +43,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-examples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 04b9372..73b8158 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -35,11 +35,19 @@ under the License.
<packaging>jar</packaging>
<dependencies>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.json</artifactId>
+ <version>2.0.6</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ee7c4a83/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
new file mode 100644
index 0000000..267d035
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.examples.function;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * Abstract class derived from {@link FlatMapFunction} to handle JSON files.
+ * @param <IN>
+ * Type of the input elements.
+ * @param <OUT>
+ * Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends
+ FlatMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(DataStream.class);
+
+ /**
+ * Get the value of a field in a JSON text.
+ * @param jsonText
+ * The JSON text in which the field is searched.
+ * @param field
+ * The field which is searched in the JSON text.
+ * In case of embedded records fields have to be referred separated by dots.
+ * @return
+ * The value of the given field if it exists. Otherwise function returns with null.
+ */
+ public String getField(String jsonText, String field) {
+ JSONObject jo = null;
+ try {
+ jo = new JSONObject(jsonText);
+ } catch (JSONException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Input string is not proper", e);
+ }
+ return null;
+ }
+
+ try {
+ String[] fieldArray = field.split("[.]");
+ int length = fieldArray.length;
+
+ return findInnerField(jo, fieldArray, length).getString(
+ fieldArray[length - 1]);
+
+ } catch (JSONException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Field " + field + " not found");
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find an embedded JSON code associated with the given key (fieldArray).
+ * @param jo
+ * JSONObject in which we search.
+ * @param fieldArray
+ * String array identifying the field.
+ * @param length
+ * Length of the array.
+ * @return
+ * the searched embedded JSONObject if it exists.
+ * @throws JSONException
+ * if the key is not found.
+ */
+ private JSONObject findInnerField(JSONObject jo, String[] fieldArray,
+ int length) throws JSONException {
+
+ for (int i = 0; i <= length - 2; i++) {
+ jo = jo.getJSONObject(fieldArray[i]);
+ }
+ return jo;
+ }
+}