You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:11:01 UTC
[18/18] git commit: [streaming] Updated logging to utilize SLF4J
[streaming] Updated logging to utilize SLF4J
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0dd5138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0dd5138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0dd5138
Branch: refs/heads/master
Commit: d0dd5138fd0bddf2bf942bffee1681c298043b3e
Parents: 5f601cf
Author: ghermann <re...@gmail.com>
Authored: Wed Sep 10 11:58:36 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 10 +-
.../streaming/connectors/flume/FlumeSink.java | 2 +-
.../connectors/flume/FlumeTopology.java | 30 +--
.../connectors/json/JSONParseFlatMap.java | 144 +++++++++++++
.../streaming/connectors/json/JSONParser.java | 175 +++++++++++++++
.../connectors/kafka/KafkaTopology.java | 12 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 2 +-
.../connectors/rabbitmq/RMQSource.java | 12 +-
.../connectors/rabbitmq/RMQTopology.java | 15 +-
.../connectors/twitter/TwitterLocal.java | 2 +-
.../connectors/twitter/TwitterSource.java | 71 ++++---
.../connectors/twitter/TwitterStreaming.java | 8 +-
.../connectors/json/JSONParserTest.java | 74 +++++++
.../connectors/json/JSONParserTest2.java | 95 +++++++++
.../flink/streaming/api/JobGraphBuilder.java | 211 ++++++++-----------
.../api/collector/DirectedStreamCollector.java | 11 +-
.../api/collector/StreamCollector.java | 9 +-
.../environment/RemoteStreamEnvironment.java | 2 +-
.../api/invokable/StreamOperatorInvokable.java | 4 +-
.../api/invokable/operator/co/CoInvokable.java | 14 +-
.../api/streamcomponent/OutputHandler.java | 24 +--
.../streamcomponent/StreamIterationSink.java | 13 +-
.../streamcomponent/StreamIterationSource.java | 2 +-
.../api/streamcomponent/StreamSink.java | 4 +-
.../flink/streaming/util/TestDataUtil.java | 8 +-
.../flink-streaming-examples/pom.xml | 6 +
.../examples/function/JSONParseFlatMap.java | 144 -------------
.../streaming/examples/function/JSONParser.java | 175 ---------------
.../examples/function/JSONParserTest.java | 73 -------
.../examples/function/JSONParserTest2.java | 94 ---------
30 files changed, 714 insertions(+), 732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index ee99d7f..00fc675 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -43,12 +43,6 @@ under the License.
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-examples</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
@@ -65,6 +59,10 @@ under the License.
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index c4618a7..6bc5d8a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -110,7 +110,7 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
Thread.sleep(1000);
} catch (InterruptedException e1) {
if (LOG.isErrorEnabled()) {
- LOG.error("Interrupted while trying to connect " + port + " at " + host);
+ LOG.error("Interrupted while trying to connect {} at {}", port, host);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 38ea6ef..73668c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -18,14 +18,15 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class FlumeTopology {
- private static final Log LOG = LogFactory.getLog(FlumeTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FlumeTopology.class);
+
public static class MyFlumeSink extends FlumeSink<String> {
private static final long serialVersionUID = 1L;
@@ -39,8 +40,8 @@ public class FlumeTopology {
try {
sendAndClose();
} catch (Exception e) {
- throw new RuntimeException("Error while closing Flume connection with " + port + " at "
- + host, e);
+ throw new RuntimeException("Error while closing Flume connection with " + port
+ + " at " + host, e);
}
}
return SerializationUtils.serialize(tuple);
@@ -53,12 +54,13 @@ public class FlumeTopology {
@Override
public void invoke(String value) {
- LOG.info("String: <" + value + "> arrived from Flume");
-
+ if (LOG.isInfoEnabled()) {
+ LOG.info("String: <{}> arrived from Flume", value);
+ }
}
-
+
}
-
+
public static class MyFlumeSource extends FlumeSource<String> {
private static final long serialVersionUID = 1L;
@@ -82,14 +84,12 @@ public class FlumeTopology {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<String> dataStream1 = env
- .addSource(new MyFlumeSource("localhost", 41414))
- .addSink(new MyFlumePrintSink());
+ DataStream<String> dataStream1 = env.addSource(new MyFlumeSource("localhost", 41414))
+ .addSink(new MyFlumePrintSink());
@SuppressWarnings("unused")
- DataStream<String> dataStream2 = env
- .fromElements("one", "two", "three", "four", "five", "q")
- .addSink(new MyFlumeSink("localhost", 42424));
+ DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+ "q").addSink(new MyFlumeSink("localhost", 42424));
env.execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..96b1bf7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ *
+ * @param <IN>
+ * Type of the input elements.
+ * @param <OUT>
+ * Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+ /**
+ * Get the value object associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public Object get(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).get("retValue");
+ }
+
+ /**
+ * Get the boolean value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public boolean getBoolean(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getBoolean("retValue");
+ }
+
+ /**
+ * Get the double value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public double getDouble(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getDouble("retValue");
+ }
+
+ /**
+ * Get the int value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public int getInt(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getInt("retValue");
+ }
+
+ /**
+ * Get the long value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public long getLong(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getLong("retValue");
+ }
+
+ /**
+ * Get the String value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public String getString(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getString("retValue");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..4e34483
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+ private JSONObject originalJO;
+ private String searchedfield;
+ private Object temp;
+
+ /**
+ * Construct a JSONParser from a string. The string has to be a JSON code
+ * from which we want to get a field.
+ *
+ * @param jsonText
+ * A string which contains a JSON code. String representation of
+ * a JSON code.
+ * @throws JSONException
+ * If there is a syntax error in the source string.
+ */
+ public JSONParser(String jsonText) throws JSONException {
+ originalJO = new JSONObject(jsonText);
+ }
+
+ /**
+ *
+ * Parse the JSON code passed to the constructor to find the given key.
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @return A JSONObject which has only one field called "retValue" and the
+ * value associated to it is the searched value. The methods of
+ * JSONObject can be used to get the field value in a desired
+ * format.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ public JSONObject parse(String key) throws JSONException {
+ initializeParser(key);
+ parsing();
+ return putResultInJSONObj();
+ }
+
+ /**
+ * Prepare the fields of the class for the parsing
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void initializeParser(String key) throws JSONException {
+ searchedfield = key;
+ temp = new JSONObject(originalJO.toString());
+ }
+
+ /**
+ * This function goes through the given field and calls the appropriate
+ * functions to treat the units between the punctuation marks.
+ *
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void parsing() throws JSONException {
+ StringTokenizer st = new StringTokenizer(searchedfield, ".");
+ while (st.hasMoreTokens()) {
+ find(st.nextToken());
+ }
+ }
+
+ /**
+ * Search for the next part of the field and update the state if it was
+ * found.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void find(String nextToken) throws JSONException {
+ if (endsWithBracket(nextToken)) {
+ treatAllBracket(nextToken);
+ } else {
+ temp = ((JSONObject) temp).get(nextToken);
+ }
+ }
+
+ /**
+ * Determine whether the given string ends with a closing square bracket ']'
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @return True if the given string ends with a closing square bracket ']'
+ * and false otherwise.
+ */
+ private boolean endsWithBracket(String nextToken) {
+ return nextToken.substring(nextToken.length() - 1).endsWith("]");
+ }
+
+ /**
+ * Handle (multidimensional) arrays. Treat the square bracket pairs one
+ * after the other if necessary.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the searched element is not found.
+ */
+ private void treatAllBracket(String nextToken) throws JSONException {
+ List<String> list = Arrays.asList(nextToken.split("\\["));
+ ListIterator<String> iter = list.listIterator();
+
+ temp = ((JSONObject) temp).get(iter.next());
+
+ while (iter.hasNext()) {
+ int index = Integer.parseInt(cutBracket(iter.next()));
+ temp = ((JSONArray) temp).get(index);
+ }
+ }
+
+ /**
+ * Remove the last character of the string.
+ *
+ * @param string
+ * String to modify.
+ * @return The given string without the last character.
+ */
+ private String cutBracket(String string) {
+ return string.substring(0, string.length() - 1);
+ }
+
+ /**
+ * Save the result of the search into a JSONObject.
+ *
+ * @return A special JSONObject which contain only one key. The value
+ * associated to this key is the result of the search.
+ * @throws JSONException
+ * If there is a problem creating the JSONObject. (e.g. invalid
+ * syntax)
+ */
+ private JSONObject putResultInJSONObj() throws JSONException {
+ JSONObject jo = new JSONObject();
+ jo.put("retValue", temp);
+ return jo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 2c89471..64ea810 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -18,16 +18,16 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KafkaTopology {
- private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MySource implements SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@@ -82,7 +82,9 @@ public class KafkaTopology {
@Override
public void invoke(Tuple1<String> value) {
- LOG.info("String: " + value + " arrived from Kafka");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("String: <{}> arrived from Kafka", value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index ae04298..22e9aae 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -84,7 +84,7 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+ LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 1fcd57b..4f2feba 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -19,11 +19,11 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import net.spy.memcached.compat.log.Logger;
-import net.spy.memcached.compat.log.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -86,7 +86,9 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
try {
delivery = consumer.nextDelivery();
} catch (Exception e) {
- LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ }
}
outTuple = deserialize(delivery.getBody());
@@ -103,8 +105,8 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
try {
connection.close();
} catch (IOException e) {
- throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
- + HOST_NAME, e);
+ throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ + " at " + HOST_NAME, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 6cdda17..6d343dc 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -18,15 +18,15 @@
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.commons.lang.SerializationUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RMQTopology {
- private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
public static final class MyRMQSink extends RMQSink<String> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
@@ -50,8 +50,9 @@ public class RMQTopology {
@Override
public void invoke(String value) {
- LOG.info("String: <" + value + "> arrived from RMQ");
-
+ if (LOG.isInfoEnabled()) {
+ LOG.info("String: <{}> arrived from RMQ", value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 06a1308..465a500 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 4aa7a43..525f4c8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -41,8 +41,8 @@ import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
/**
- * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
- * It can connect to Twitter Streaming API, collect tweets and
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. It can connect to Twitter Streaming API, collect tweets and
*/
public class TwitterSource extends RichSourceFunction<String> {
@@ -60,8 +60,10 @@ public class TwitterSource extends RichSourceFunction<String> {
/**
* Create {@link TwitterSource} for streaming
+ *
* @param authPath
- * Location of the properties file containing the required authentication information.
+ * Location of the properties file containing the required
+ * authentication information.
*/
public TwitterSource(String authPath) {
this.authPath = authPath;
@@ -69,10 +71,11 @@ public class TwitterSource extends RichSourceFunction<String> {
}
/**
- * Create {@link TwitterSource} to
- * collect finite number of tweets
+ * Create {@link TwitterSource} to collect finite number of tweets
+ *
* @param authPath
- * Location of the properties file containing the required authentication information.
+ * Location of the properties file containing the required
+ * authentication information.
* @param numberOfTweets
*
*/
@@ -86,17 +89,17 @@ public class TwitterSource extends RichSourceFunction<String> {
public void open(Configuration parameters) throws Exception {
initializeConnection();
}
-
+
@Override
public void invoke(Collector<String> collector) throws Exception {
-
+
if (streaming) {
collectMessages(collector);
} else {
collectFiniteMessages(collector);
}
}
-
+
@Override
public void close() throws Exception {
closeConnection();
@@ -136,9 +139,9 @@ public class TwitterSource extends RichSourceFunction<String> {
}
/**
- * Reads the given properties file for the authentication data.
- * @return
- * the authentication data.
+ * Reads the given properties file for the authentication data.
+ *
+ * @return the authentication data.
*/
private Properties loadAuthenticationProperties() {
Properties properties = new Properties();
@@ -147,18 +150,15 @@ public class TwitterSource extends RichSourceFunction<String> {
properties.load(input);
input.close();
} catch (IOException ioe) {
- new RuntimeException("Cannot open .properties file: " + authPath,
- ioe);
+ new RuntimeException("Cannot open .properties file: " + authPath, ioe);
}
return properties;
}
- private void initializeClient(StatusesSampleEndpoint endpoint,
- Authentication auth) {
+ private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
- client = new ClientBuilder().name("twitterSourceClient")
- .hosts(Constants.STREAM_HOST).endpoint(endpoint)
- .authentication(auth)
+ client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+ .endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
client.connect();
@@ -166,8 +166,9 @@ public class TwitterSource extends RichSourceFunction<String> {
/**
* Put tweets into collector
+ *
* @param collector
- * Collector in which the tweets are collected.
+ * Collector in which the tweets are collected.
*/
protected void collectFiniteMessages(Collector<String> collector) {
@@ -186,8 +187,9 @@ public class TwitterSource extends RichSourceFunction<String> {
/**
* Put tweets into collector
+ *
* @param collector
- * Collector in which the tweets are collected.
+ * Collector in which the tweets are collected.
*/
protected void collectMessages(Collector<String> collector) {
@@ -202,14 +204,15 @@ public class TwitterSource extends RichSourceFunction<String> {
/**
* Put one tweet into the collector.
+ *
* @param collector
- * Collector in which the tweets are collected.
+ * Collector in which the tweets are collected.
*/
protected void collectOneMessage(Collector<String> collector) {
if (client.isDone()) {
if (LOG.isErrorEnabled()) {
- LOG.error("Client connection closed unexpectedly: "
- + client.getExitEvent().getMessage());
+ LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+ .getMessage());
}
}
@@ -219,8 +222,7 @@ public class TwitterSource extends RichSourceFunction<String> {
collector.collect(msg);
} else {
if (LOG.isInfoEnabled()) {
- LOG.info("Did not receive a message in " + waitSec
- + " seconds");
+ LOG.info("Did not receive a message in {} seconds", waitSec);
}
}
} catch (InterruptedException e) {
@@ -243,7 +245,8 @@ public class TwitterSource extends RichSourceFunction<String> {
}
/**
- * Get the size of the queue in which the tweets are contained temporarily.
+ * Get the size of the queue in which the tweets are contained temporarily.
+ *
* @return
*/
public int getQueueSize() {
@@ -251,18 +254,19 @@ public class TwitterSource extends RichSourceFunction<String> {
}
/**
- * Set the size of the queue in which the tweets are contained temporarily.
+ * Set the size of the queue in which the tweets are contained temporarily.
+ *
* @param queueSize
- * The desired value.
+ * The desired value.
*/
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
-
+
/**
* This function tells how long TwitterSource waits for the tweets.
- * @return
- * Number of second.
+ *
+ * @return Number of second.
*/
public int getWaitSec() {
return waitSec;
@@ -270,8 +274,9 @@ public class TwitterSource extends RichSourceFunction<String> {
/**
* This function sets how long TwitterSource should wait for the tweets.
+ *
* @param waitSec
- * The desired value.
+ * The desired value.
*/
public void setWaitSec(int waitSec) {
this.waitSec = waitSec;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index b715d18..5927ce4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -17,15 +17,15 @@
package org.apache.flink.streaming.connectors.twitter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TwitterStreaming {
@@ -33,7 +33,7 @@ public class TwitterStreaming {
private static final int SOURCE_PARALLELISM = 1;
private static final int NUMBEROFTWEETS = 100;
- private static final Log LOG = LogFactory.getLog(TwitterStreaming.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b84c852
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+ private String jsonText;
+ private String searchedField;
+
+ public JSONParserTest(String text, String field) {
+ jsonText = text;
+ searchedField = field;
+ }
+
+ @Parameters
+ public static Collection<Object[]> initParameterList() {
+
+ Object[][] parameterList = new Object[][] {
+ { "{\"key\":\"value\"}", "key" },
+ { "{\"key\":[\"value\"]}", "key[0]" },
+ { "{\"key\":[{\"key\":\"value\"}]}", "key[0].key" },
+ { "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", "key[0].key[0].key"},
+ { "{\"key\":[1,[{\"key\":\"value\"}]]}", "key[1][0].key" },
+ { "{\"key\":[1,[[\"key\",2,\"value\"]]]}", "key[1][0][2]" },
+ { "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+ };
+
+ return Arrays.asList(parameterList);
+ }
+
+ @Test
+ public void test() {
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+ String expected = "{\"retValue\":\"value\"}";
+
+ assertTrue(expected.equals(jo.toString()));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..6730c25
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+
+ @Test
+ public void testGetBooleanFunction() {
+ String jsonText = "{\"key\":true}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertTrue(jo.getBoolean("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetDoubleFunction() {
+ double expected = 12345.12345;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getDouble("retValue"),0.000001);
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetIntFunction() {
+ int expected = 15;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getInt("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetLongFunction() {
+ long expected = 111111111111L;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getLong("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index cd54a54..4bb022a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -164,17 +164,16 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
- public void addSource(String componentName,
- SourceInvokable<?> InvokableObject,
+ public void addSource(String componentName, SourceInvokable<?> InvokableObject,
TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
- addComponent(componentName, StreamSource.class, InvokableObject,
- operatorName, serializedFunction, parallelism);
+ addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
+ serializedFunction, parallelism);
addTypeWrappers(componentName, null, null, outTypeWrapper, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE: " + componentName);
+ LOG.debug("SOURCE: {}", componentName);
}
}
@@ -193,11 +192,10 @@ public class JobGraphBuilder {
* @param waitTime
* Max wait time for next record
*/
- public void addIterationSource(String componentName, String iterationHead,
- String iterationID, int parallelism, long waitTime) {
+ public void addIterationSource(String componentName, String iterationHead, String iterationID,
+ int parallelism, long waitTime) {
- addComponent(componentName, StreamIterationSource.class, null, null,
- null, parallelism);
+ addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
iterationIds.put(componentName, iterationID);
iterationIDtoSourceName.put(iterationID, componentName);
@@ -205,14 +203,13 @@ public class JobGraphBuilder {
setBytesFrom(iterationHead, componentName);
setEdge(componentName, iterationHead,
- connectionTypes.get(inEdgeList.get(iterationHead).get(0))
- .get(0), 0, new ArrayList<String>(), false);
+ connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0,
+ new ArrayList<String>(), false);
- iterationWaitTime.put(iterationIDtoSourceName.get(iterationID),
- waitTime);
+ iterationWaitTime.put(iterationIDtoSourceName.get(iterationID), waitTime);
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SOURCE: " + componentName);
+ LOG.debug("ITERATION SOURCE: {}", componentName);
}
}
@@ -236,36 +233,32 @@ public class JobGraphBuilder {
*/
public <IN, OUT> void addTask(String componentName,
StreamOperatorInvokable<IN, OUT> taskInvokableObject,
- TypeSerializerWrapper<?> inTypeWrapper,
- TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
- byte[] serializedFunction, int parallelism) {
+ TypeSerializerWrapper<?> inTypeWrapper, TypeSerializerWrapper<?> outTypeWrapper,
+ String operatorName, byte[] serializedFunction, int parallelism) {
- addComponent(componentName, StreamTask.class, taskInvokableObject,
- operatorName, serializedFunction, parallelism);
+ addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
+ serializedFunction, parallelism);
- addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper,
- null);
+ addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("TASK: " + componentName);
+ LOG.debug("TASK: {}", componentName);
}
}
public <IN1, IN2, OUT> void addCoTask(String componentName,
CoInvokable<IN1, IN2, OUT> taskInvokableObject,
- TypeSerializerWrapper<?> in1TypeWrapper,
- TypeSerializerWrapper<?> in2TypeWrapper,
+ TypeSerializerWrapper<?> in1TypeWrapper, TypeSerializerWrapper<?> in2TypeWrapper,
TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
- addComponent(componentName, CoStreamTask.class,
- taskInvokableObject, operatorName, serializedFunction,
- parallelism);
-
+ addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
+ serializedFunction, parallelism);
+
addTypeWrappers(componentName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("CO-TASK: " + componentName);
+ LOG.debug("CO-TASK: {}", componentName);
}
}
@@ -284,15 +277,15 @@ public class JobGraphBuilder {
* Number of parallel instances created
*/
public void addSink(String componentName, SinkInvokable<?> InvokableObject,
- TypeSerializerWrapper<?> inTypeWrapper, String operatorName,
- byte[] serializedFunction, int parallelism) {
+ TypeSerializerWrapper<?> inTypeWrapper, String operatorName, byte[] serializedFunction,
+ int parallelism) {
- addComponent(componentName, StreamSink.class,
- InvokableObject, operatorName, serializedFunction, parallelism);
+ addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
+ serializedFunction, parallelism);
addTypeWrappers(componentName, inTypeWrapper, null, null, null);
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK: " + componentName);
+ LOG.debug("SINK: {}", componentName);
}
}
@@ -315,19 +308,18 @@ public class JobGraphBuilder {
* @param waitTime
* Max waiting time for next record
*/
- public void addIterationSink(String componentName, String iterationTail,
- String iterationID, int parallelism, long waitTime) {
+ public void addIterationSink(String componentName, String iterationTail, String iterationID,
+ int parallelism, long waitTime) {
- addComponent(componentName, StreamIterationSink.class, null,
- null, null, parallelism);
+ addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
iterationIds.put(componentName, iterationID);
iterationIDtoSinkName.put(iterationID, componentName);
setBytesFrom(iterationTail, componentName);
- //setInTypeWrappersFrom(iterationTail, componentName);
+ // setInTypeWrappersFrom(iterationTail, componentName);
iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SINK: " + componentName);
+ LOG.debug("ITERATION SINK: {}", componentName);
}
}
@@ -351,9 +343,8 @@ public class JobGraphBuilder {
* Number of parallel instances created
*/
private void addComponent(String componentName,
- Class<? extends AbstractInvokable> componentClass,
- StreamInvokable<?> invokableObject, String operatorName,
- byte[] serializedFunction, int parallelism) {
+ Class<? extends AbstractInvokable> componentClass, StreamInvokable<?> invokableObject,
+ String operatorName, byte[] serializedFunction, int parallelism) {
componentClasses.put(componentName, componentClass);
setParallelism(componentName, parallelism);
@@ -366,14 +357,13 @@ public class JobGraphBuilder {
outEdgeNames.put(componentName, new ArrayList<List<String>>());
outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
inEdgeList.put(componentName, new ArrayList<String>());
- connectionTypes.put(componentName,
- new ArrayList<StreamPartitioner<?>>());
+ connectionTypes.put(componentName, new ArrayList<StreamPartitioner<?>>());
iterationTailCount.put(componentName, 0);
}
- private void addTypeWrappers(String componentName,
- TypeSerializerWrapper<?> in1, TypeSerializerWrapper<?> in2,
- TypeSerializerWrapper<?> out1, TypeSerializerWrapper<?> out2) {
+ private void addTypeWrappers(String componentName, TypeSerializerWrapper<?> in1,
+ TypeSerializerWrapper<?> in2, TypeSerializerWrapper<?> out1,
+ TypeSerializerWrapper<?> out2) {
typeWrapperIn1.put(componentName, in1);
typeWrapperIn2.put(componentName, in2);
typeWrapperOut1.put(componentName, out1);
@@ -390,10 +380,8 @@ public class JobGraphBuilder {
private void createVertex(String componentName) {
// Get vertex attributes
- Class<? extends AbstractInvokable> componentClass = componentClasses
- .get(componentName);
- StreamInvokable<?> invokableObject = invokableObjects
- .get(componentName);
+ Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
+ StreamInvokable<?> invokableObject = invokableObjects.get(componentName);
String operatorName = operatorNames.get(componentName);
byte[] serializedFunction = serializedFunctions.get(componentName);
int parallelism = componentParallelism.get(componentName);
@@ -417,20 +405,19 @@ public class JobGraphBuilder {
component.setInvokableClass(componentClass);
component.setNumberOfSubtasks(parallelism);
if (LOG.isDebugEnabled()) {
- LOG.debug("Parallelism set: " + parallelism + " for "
- + componentName);
+ LOG.debug("Parallelism set: {} for {}", parallelism, componentName);
}
StreamConfig config = new StreamConfig(component.getConfiguration());
config.setMutability(mutability.get(componentName));
config.setBufferTimeout(bufferTimeout.get(componentName));
-
+
config.setTypeWrapperIn1(typeWrapperIn1.get(componentName));
config.setTypeWrapperIn2(typeWrapperIn2.get(componentName));
config.setTypeWrapperOut1(typeWrapperOut1.get(componentName));
config.setTypeWrapperOut2(typeWrapperOut2.get(componentName));
-
+
// Set vertex config
config.setUserInvokable(invokableObject);
config.setComponentName(componentName);
@@ -486,10 +473,9 @@ public class JobGraphBuilder {
* @param outputNames
* User defined names of the out edge
*/
- public void setEdge(String upStreamComponentName,
- String downStreamComponentName,
- StreamPartitioner<?> partitionerObject, int typeNumber,
- List<String> outputNames, boolean selectAll) {
+ public void setEdge(String upStreamComponentName, String downStreamComponentName,
+ StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames,
+ boolean selectAll) {
outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
outEdgeType.get(upStreamComponentName).add(typeNumber);
inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
@@ -509,45 +495,38 @@ public class JobGraphBuilder {
* @param partitionerObject
* The partitioner
*/
- private <T> void connect(String upStreamComponentName,
- String downStreamComponentName,
+ private <T> void connect(String upStreamComponentName, String downStreamComponentName,
StreamPartitioner<T> partitionerObject) {
- AbstractJobVertex upStreamComponent = components
- .get(upStreamComponentName);
- AbstractJobVertex downStreamComponent = components
- .get(downStreamComponentName);
+ AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
+ AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
- StreamConfig config = new StreamConfig(
- upStreamComponent.getConfiguration());
+ StreamConfig config = new StreamConfig(upStreamComponent.getConfiguration());
try {
if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
- upStreamComponent.connectTo(downStreamComponent,
- ChannelType.NETWORK, DistributionPattern.POINTWISE);
+ upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK,
+ DistributionPattern.POINTWISE);
} else {
- upStreamComponent.connectTo(downStreamComponent,
- ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+ upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK,
+ DistributionPattern.BIPARTITE);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("CONNECTED: "
- + partitionerObject.getClass().getSimpleName() + " - "
- + upStreamComponentName + " -> "
- + downStreamComponentName);
+ LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
+ upStreamComponentName, downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
- throw new RuntimeException("Cannot connect components: "
- + upStreamComponentName + " to " + downStreamComponentName,
- e);
+ throw new RuntimeException("Cannot connect components: " + upStreamComponentName
+ + " to " + downStreamComponentName, e);
}
int outputIndex = upStreamComponent.getNumberOfForwardConnections() - 1;
- config.setOutputName(outputIndex,
- outEdgeNames.get(upStreamComponentName).get(outputIndex));
- config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
+ config.setOutputName(outputIndex, outEdgeNames.get(upStreamComponentName).get(outputIndex));
+ config.setSelectAll(outputIndex,
+ outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
config.setPartitioner(outputIndex, partitionerObject);
config.setNumberOfOutputChannels(outputIndex,
componentParallelism.get(downStreamComponentName));
@@ -562,12 +541,10 @@ public class JobGraphBuilder {
* @param iterationTail
* ID of the iteration tail
*/
- public void setIterationSourceSettings(String iterationID,
- String iterationTail) {
+ public void setIterationSourceSettings(String iterationID, String iterationTail) {
setParallelism(iterationIDtoSourceName.get(iterationID),
componentParallelism.get(iterationTail));
- setBufferTimeout(iterationIDtoSourceName.get(iterationID),
- bufferTimeout.get(iterationTail));
+ setBufferTimeout(iterationIDtoSourceName.get(iterationID), bufferTimeout.get(iterationTail));
}
/**
@@ -580,12 +557,11 @@ public class JobGraphBuilder {
* @param serializedOutputSelector
* Byte array representing the serialized output selector.
*/
- public <T> void setOutputSelector(String componentName,
- byte[] serializedOutputSelector) {
+ public <T> void setOutputSelector(String componentName, byte[] serializedOutputSelector) {
outputSelectors.put(componentName, serializedOutputSelector);
if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for " + componentName);
+ LOG.debug("Outputselector set for {}", componentName);
}
}
@@ -613,34 +589,33 @@ public class JobGraphBuilder {
}
public void setInToOutTypeWrappersFrom(String from, String to) {
- //TODO rename function
+ // TODO rename function
typeWrapperIn1.put(to, typeWrapperOut1.get(from));
typeWrapperIn2.put(to, typeWrapperOut2.get(from));
}
-
+
public void setOutToOutTypeWrappersFrom(String from, String to) {
- //TODO rename function
+ // TODO rename function
typeWrapperOut1.put(to, typeWrapperOut1.get(from));
typeWrapperOut2.put(to, typeWrapperOut2.get(from));
}
-
+
public void setInToInTypeWrappersFrom(String from, String to) {
- //TODO rename function
+ // TODO rename function
typeWrapperIn1.put(to, typeWrapperIn1.get(from));
typeWrapperIn2.put(to, typeWrapperIn2.get(from));
}
-
- public TypeInformation<?> getInTypeInfo(String id){
- // TODO
+
+ public TypeInformation<?> getInTypeInfo(String id) {
+ // TODO
System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
return typeWrapperIn1.get(id).getTypeInfo();
}
-
- public TypeInformation<?> getOutTypeInfo(String id){
- // TODO
+
+ public TypeInformation<?> getOutTypeInfo(String id) {
+ // TODO
return typeWrapperOut1.get(id).getTypeInfo();
}
-
/**
* Sets instance sharing between the given components
@@ -662,13 +637,11 @@ public class JobGraphBuilder {
*/
private void setAutomaticInstanceSharing() {
- AbstractJobVertex maxParallelismVertex = components
- .get(maxParallelismVertexName);
+ AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (!componentName.equals(maxParallelismVertexName)) {
- components.get(componentName).setVertexToShareInstancesWith(
- maxParallelismVertex);
+ components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
@@ -679,9 +652,8 @@ public class JobGraphBuilder {
*/
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
- (new StreamConfig(component.getConfiguration()))
- .setNumberOfInputs(component
- .getNumberOfBackwardConnections());
+ (new StreamConfig(component.getConfiguration())).setNumberOfInputs(component
+ .getNumberOfBackwardConnections());
}
}
@@ -691,9 +663,8 @@ public class JobGraphBuilder {
*/
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
- (new StreamConfig(component.getConfiguration()))
- .setNumberOfOutputs(component
- .getNumberOfForwardConnections());
+ (new StreamConfig(component.getConfiguration())).setNumberOfOutputs(component
+ .getNumberOfForwardConnections());
}
}
@@ -710,19 +681,15 @@ public class JobGraphBuilder {
for (String upStreamComponentName : outEdgeList.keySet()) {
int i = 0;
- List<Integer> outEdgeTypeList = outEdgeType
- .get(upStreamComponentName);
+ List<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
- for (String downStreamComponentName : outEdgeList
- .get(upStreamComponentName)) {
- StreamConfig downStreamComponentConfig = new StreamConfig(
- components.get(downStreamComponentName)
- .getConfiguration());
+ for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
+ StreamConfig downStreamComponentConfig = new StreamConfig(components.get(
+ downStreamComponentName).getConfiguration());
int inputNumber = downStreamComponentConfig.getNumberOfInputs();
- downStreamComponentConfig.setInputType(inputNumber++,
- outEdgeTypeList.get(i));
+ downStreamComponentConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
downStreamComponentConfig.setNumberOfInputs(inputNumber);
connect(upStreamComponentName, downStreamComponentName,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index ab6caea..42a2683 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
-
+
OutputSelector<OUT> outputSelector;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs;
private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
@@ -88,9 +88,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
.get(outputName);
if (outputList == null) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format(
- "Cannot emit because no output is selected with the name: %s",
- outputName));
+ LOG.error("Cannot emit because no output is selected with the name: {}",
+ outputName);
}
}
@@ -110,8 +109,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
}
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Emit to %s failed due to: %s", outputName,
- StringUtils.stringifyException(e)));
+ LOG.error("Emit to {} failed due to: {}", outputName,
+ StringUtils.stringifyException(e));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 54cab72..7f38df1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -59,7 +59,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
public StreamCollector(int channelID,
SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
this.serializationDelegate = serializationDelegate;
-
+
if (serializationDelegate != null) {
this.streamRecord = serializationDelegate.getInstance();
} else {
@@ -104,7 +104,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
}
}
}
-
+
/**
* Collects and emits a tuple/object to the outputs by reusing a
* StreamRecord object.
@@ -129,15 +129,14 @@ public class StreamCollector<OUT> implements Collector<OUT> {
serializationDelegate.setInstance(streamRecord);
emitToOutputs();
}
-
+
protected void emitToOutputs() {
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
try {
output.emit(serializationDelegate);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Emit failed due to: %s",
- StringUtils.stringifyException(e)));
+ LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index b2fcf89..b20e832 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -71,7 +71,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Override
public void execute() {
if (LOG.isInfoEnabled()) {
- LOG.info("Running remotely at " + host + ":" + port);
+ LOG.info("Running remotely at {}:{}", host, port);
}
JobGraph jobGraph = jobGraphBuilder.getJobGraph();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
index 799f647..558d11c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -102,8 +102,8 @@ public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<O
callUserFunction();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Calling user function failed due to: %s",
- StringUtils.stringifyException(e)));
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index c21e784..b064df7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.invokable.operator.co;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -26,6 +24,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
@@ -34,7 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(CoInvokable.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
protected StreamRecord<IN1> reuse1;
@@ -120,8 +120,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
callUserFunction1();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Calling user function failed due to: %s",
- StringUtils.stringifyException(e)));
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
}
}
}
@@ -131,8 +131,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
callUserFunction2();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Calling user function failed due to: %s",
- StringUtils.stringifyException(e)));
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 76277dc..7382d7d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
@@ -35,9 +33,11 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.StreamRecordWriter;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.types.TypeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class OutputHandler<OUT> {
- private static final Log LOG = LogFactory.getLog(OutputHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
private AbstractStreamComponent streamComponent;
private StreamConfig configuration;
@@ -128,14 +128,14 @@ public class OutputHandler<OUT> {
outputs.add(output);
List<String> outputName = configuration.getOutputName(outputNumber);
boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
-
+
if (collector != null) {
collector.addOutput(output, outputName, isSelectAllOutput);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Partitioner set: " + outputPartitioner.getClass().getSimpleName() + " with "
- + outputNumber + " outputs");
+ LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
+ .getSimpleName(), outputNumber);
}
}
@@ -153,11 +153,11 @@ public class OutputHandler<OUT> {
long startTime;
- public void invokeUserFunction(String componentTypeName,
- StreamInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+ public void invokeUserFunction(String componentTypeName, StreamInvokable<OUT> userInvokable)
+ throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
- LOG.debug(componentTypeName + " " + streamComponent.getName()
- + " invoked with instance id " + streamComponent.getInstanceID());
+ LOG.debug("{} {} invoked with instance id {}", componentTypeName,
+ streamComponent.getName(), streamComponent.getInstanceID());
}
initializeOutputSerializers();
@@ -170,8 +170,8 @@ public class OutputHandler<OUT> {
}
if (LOG.isDebugEnabled()) {
- LOG.debug(componentTypeName + " " + streamComponent.getName()
- + " invoke finished with instance id " + streamComponent.getInstanceID());
+ LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
+ streamComponent.getName(), streamComponent.getInstanceID());
}
flushOutputs();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 8a1a637..40c4b96 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -27,13 +27,12 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
import org.apache.flink.util.StringUtils;
-public class StreamIterationSink<IN extends Tuple> extends
- AbstractStreamComponent {
+public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent {
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
private InputHandler<IN> inputHandler;
-
+
private String iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
@@ -61,13 +60,13 @@ public class StreamIterationSink<IN extends Tuple> extends
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + getName() + " invoked");
+ LOG.debug("SINK {} invoked", getName());
}
forwardRecords();
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + getName() + " invoke finished");
+ LOG.debug("SINK {} invoke finished", getName());
}
}
@@ -92,8 +91,8 @@ public class StreamIterationSink<IN extends Tuple> extends
}
} catch (InterruptedException e) {
if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Pushing back record at iteration %s failed due to: %s",
- iterationId, StringUtils.stringifyException(e)));
+ LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+ StringUtils.stringifyException(e));
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index ab02d84..3affc8c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -68,7 +68,7 @@ public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComp
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE " + getName() + " invoked with instance id " + getInstanceID());
+ LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
}
outputHandler.initializeOutputSerializers();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 0797cc1..2754c89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -48,13 +48,13 @@ public class StreamSink<IN> extends AbstractStreamComponent {
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + getName() + " invoked");
+ LOG.debug("SINK {} invoked", getName());
}
invokeUserFunction(userInvokable);
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + getName() + " invoke finished");
+ LOG.debug("SINK {} invoke finished", getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 53eafaa..52407bc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -66,7 +66,7 @@ public class TestDataUtil {
if (file.exists()) {
if (LOG.isInfoEnabled()) {
- LOG.info(fileName + " already exists.");
+ LOG.info("{} already exists.", fileName);
}
try {
@@ -93,8 +93,10 @@ public class TestDataUtil {
}
public static void download(String fileName) {
- LOG.info("downloading " + fileName);
-
+ if (LOG.isInfoEnabled()) {
+ LOG.info("downloading {}", fileName);
+ }
+
try {
URL website = new URL(testRepoUrl + fileName);
BufferedReader bReader = new BufferedReader(new InputStreamReader(website.openStream()));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index a06ceca..9c9f00d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -42,6 +42,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
deleted file mode 100644
index d39996f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.function;
-
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
- *
- * @param <IN>
- * Type of the input elements.
- * @param <OUT>
- * Type of the returned elements.
- */
-public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
-
- /**
- * Get the value object associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public Object get(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).get("retValue");
- }
-
- /**
- * Get the boolean value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public boolean getBoolean(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getBoolean("retValue");
- }
-
- /**
- * Get the double value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public double getDouble(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getDouble("retValue");
- }
-
- /**
- * Get the int value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public int getInt(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getInt("retValue");
- }
-
- /**
- * Get the long value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public long getLong(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getLong("retValue");
- }
-
- /**
- * Get the String value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public String getString(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getString("retValue");
- }
-}