You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:11:01 UTC

[18/18] git commit: [streaming] Updated logging to utilize SLF4J

[streaming] Updated logging to utilize SLF4J


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0dd5138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0dd5138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0dd5138

Branch: refs/heads/master
Commit: d0dd5138fd0bddf2bf942bffee1681c298043b3e
Parents: 5f601cf
Author: ghermann <re...@gmail.com>
Authored: Wed Sep 10 11:58:36 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |  10 +-
 .../streaming/connectors/flume/FlumeSink.java   |   2 +-
 .../connectors/flume/FlumeTopology.java         |  30 +--
 .../connectors/json/JSONParseFlatMap.java       | 144 +++++++++++++
 .../streaming/connectors/json/JSONParser.java   | 175 +++++++++++++++
 .../connectors/kafka/KafkaTopology.java         |  12 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   2 +-
 .../connectors/rabbitmq/RMQSource.java          |  12 +-
 .../connectors/rabbitmq/RMQTopology.java        |  15 +-
 .../connectors/twitter/TwitterLocal.java        |   2 +-
 .../connectors/twitter/TwitterSource.java       |  71 ++++---
 .../connectors/twitter/TwitterStreaming.java    |   8 +-
 .../connectors/json/JSONParserTest.java         |  74 +++++++
 .../connectors/json/JSONParserTest2.java        |  95 +++++++++
 .../flink/streaming/api/JobGraphBuilder.java    | 211 ++++++++-----------
 .../api/collector/DirectedStreamCollector.java  |  11 +-
 .../api/collector/StreamCollector.java          |   9 +-
 .../environment/RemoteStreamEnvironment.java    |   2 +-
 .../api/invokable/StreamOperatorInvokable.java  |   4 +-
 .../api/invokable/operator/co/CoInvokable.java  |  14 +-
 .../api/streamcomponent/OutputHandler.java      |  24 +--
 .../streamcomponent/StreamIterationSink.java    |  13 +-
 .../streamcomponent/StreamIterationSource.java  |   2 +-
 .../api/streamcomponent/StreamSink.java         |   4 +-
 .../flink/streaming/util/TestDataUtil.java      |   8 +-
 .../flink-streaming-examples/pom.xml            |   6 +
 .../examples/function/JSONParseFlatMap.java     | 144 -------------
 .../streaming/examples/function/JSONParser.java | 175 ---------------
 .../examples/function/JSONParserTest.java       |  73 -------
 .../examples/function/JSONParserTest2.java      |  94 ---------
 30 files changed, 714 insertions(+), 732 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index ee99d7f..00fc675 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -43,12 +43,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_2.10</artifactId>
 			<version>0.8.0</version>
@@ -65,6 +59,10 @@ under the License.
 					<groupId>log4j</groupId>
 					<artifactId>log4j</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index c4618a7..6bc5d8a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -110,7 +110,7 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 						Thread.sleep(1000);
 					} catch (InterruptedException e1) {
 						if (LOG.isErrorEnabled()) {
-							LOG.error("Interrupted while trying to connect " + port + " at " + host);
+							LOG.error("Interrupted while trying to connect {} at {}", port, host);
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 38ea6ef..73668c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -18,14 +18,15 @@
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public class FlumeTopology {
-	private static final Log LOG = LogFactory.getLog(FlumeTopology.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeTopology.class);
+
 	public static class MyFlumeSink extends FlumeSink<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -39,8 +40,8 @@ public class FlumeTopology {
 				try {
 					sendAndClose();
 				} catch (Exception e) {
-					throw new RuntimeException("Error while closing Flume connection with " + port + " at "
-							+ host, e);
+					throw new RuntimeException("Error while closing Flume connection with " + port
+							+ " at " + host, e);
 				}
 			}
 			return SerializationUtils.serialize(tuple);
@@ -53,12 +54,13 @@ public class FlumeTopology {
 
 		@Override
 		public void invoke(String value) {
-			LOG.info("String: <" + value + "> arrived from Flume");
-			
+			if (LOG.isInfoEnabled()) {
+				LOG.info("String: <{}> arrived from Flume", value);
+			}
 		}
-		
+
 	}
-	
+
 	public static class MyFlumeSource extends FlumeSource<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -82,14 +84,12 @@ public class FlumeTopology {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<String> dataStream1 = env
-			.addSource(new MyFlumeSource("localhost", 41414))
-			.addSink(new MyFlumePrintSink());
+		DataStream<String> dataStream1 = env.addSource(new MyFlumeSource("localhost", 41414))
+				.addSink(new MyFlumePrintSink());
 
 		@SuppressWarnings("unused")
-		DataStream<String> dataStream2 = env
-			.fromElements("one", "two", "three", "four", "five", "q")
-			.addSink(new MyFlumeSink("localhost", 42424));
+		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+				"q").addSink(new MyFlumeSink("localhost", 42424));
 
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..96b1bf7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ * 
+ * @param <IN>
+ *            Type of the input elements.
+ * @param <OUT>
+ *            Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+	/**
+	 * Get the value object associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public Object get(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).get("retValue");
+	}
+
+	/**
+	 * Get the boolean value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public boolean getBoolean(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getBoolean("retValue");
+	}
+
+	/**
+	 * Get the double value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public double getDouble(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getDouble("retValue");
+	}
+
+	/**
+	 * Get the int value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public int getInt(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getInt("retValue");
+	}
+
+	/**
+	 * Get the long value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public long getLong(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getLong("retValue");
+	}
+	
+	/**
+	 * Get the String value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public String getString(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+		
+		return parser.parse(field).getString("retValue");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..4e34483
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+	private JSONObject originalJO;
+	private String searchedfield;
+	private Object temp;
+
+	/**
+	 * Construct a JSONParser from a string. The string has to be a JSON code
+	 * from which we want to get a field.
+	 * 
+	 * @param jsonText
+	 *            A string which contains a JSON code. String representation of
+	 *            a JSON code.
+	 * @throws JSONException
+	 *             If there is a syntax error in the source string.
+	 */
+	public JSONParser(String jsonText) throws JSONException {
+		originalJO = new JSONObject(jsonText);
+	}
+
+	/**
+	 * 
+	 * Parse the JSON code passed to the constructor to find the given key.
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @return A JSONObject which has only one field called "retValue" and the
+	 *         value associated to it is the searched value. The methods of
+	 *         JSONObject can be used to get the field value in a desired
+	 *         format.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	public JSONObject parse(String key) throws JSONException {
+		initializeParser(key);
+		parsing();
+		return putResultInJSONObj();
+	}
+
+	/**
+	 * Prepare the fields of the class for the parsing
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void initializeParser(String key) throws JSONException {
+		searchedfield = key;
+		temp = new JSONObject(originalJO.toString());
+	}
+
+	/**
+	 * This function goes through the given field and calls the appropriate
+	 * functions to treat the units between the punctuation marks.
+	 * 
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void parsing() throws JSONException {
+		StringTokenizer st = new StringTokenizer(searchedfield, ".");
+		while (st.hasMoreTokens()) {
+			find(st.nextToken());
+		}
+	}
+
+	/**
+	 * Search for the next part of the field and update the state if it was
+	 * found.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void find(String nextToken) throws JSONException {
+		if (endsWithBracket(nextToken)) {
+			treatAllBracket(nextToken);
+		} else {
+			temp = ((JSONObject) temp).get(nextToken);
+		}
+	}
+
+	/**
+	 * Determine whether the given string ends with a closing square bracket ']'
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @return True if the given string ends with a closing square bracket ']'
+	 *         and false otherwise.
+	 */
+	private boolean endsWithBracket(String nextToken) {
+		return nextToken.substring(nextToken.length() - 1).endsWith("]");
+	}
+
+	/**
+	 * Handle (multidimensional) arrays. Treat the square bracket pairs one
+	 * after the other if necessary.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the searched element is not found.
+	 */
+	private void treatAllBracket(String nextToken) throws JSONException {
+		List<String> list = Arrays.asList(nextToken.split("\\["));
+		ListIterator<String> iter = list.listIterator();
+
+		temp = ((JSONObject) temp).get(iter.next());
+
+		while (iter.hasNext()) {
+			int index = Integer.parseInt(cutBracket(iter.next()));
+			temp = ((JSONArray) temp).get(index);
+		}
+	}
+
+	/**
+	 * Remove the last character of the string.
+	 * 
+	 * @param string
+	 *            String to modify.
+	 * @return The given string without the last character.
+	 */
+	private String cutBracket(String string) {
+		return string.substring(0, string.length() - 1);
+	}
+
+	/**
+	 * Save the result of the search into a JSONObject.
+	 * 
+	 * @return A special JSONObject which contain only one key. The value
+	 *         associated to this key is the result of the search.
+	 * @throws JSONException
+	 *             If there is a problem creating the JSONObject. (e.g. invalid
+	 *             syntax)
+	 */
+	private JSONObject putResultInJSONObj() throws JSONException {
+		JSONObject jo = new JSONObject();
+		jo.put("retValue", temp);
+		return jo;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 2c89471..64ea810 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -18,16 +18,16 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaTopology {
-	private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
 	
 	public static final class MySource implements SourceFunction<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
@@ -82,7 +82,9 @@ public class KafkaTopology {
 
 		@Override
 		public void invoke(Tuple1<String> value) {
-			LOG.info("String: " + value + " arrived from Kafka");
+			if (LOG.isInfoEnabled()) {
+				LOG.info("String: <{}> arrived from Kafka", value);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index ae04298..22e9aae 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -84,7 +84,7 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
 			}
 		} catch (IOException e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 1fcd57b..4f2feba 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -19,11 +19,11 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import net.spy.memcached.compat.log.Logger;
-import net.spy.memcached.compat.log.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -86,7 +86,9 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 			try {
 				delivery = consumer.nextDelivery();
 			} catch (Exception e) {
-				LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+				}
 			}
 
 			outTuple = deserialize(delivery.getBody());
@@ -103,8 +105,8 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 		try {
 			connection.close();
 		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 6cdda17..6d343dc 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -18,15 +18,15 @@
 package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RMQTopology {
-	private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
 	
 	public static final class MyRMQSink extends RMQSink<String> {
 		public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
@@ -50,8 +50,9 @@ public class RMQTopology {
 
 		@Override
 		public void invoke(String value) {
-			LOG.info("String: <" + value + "> arrived from RMQ");
-			
+			if (LOG.isInfoEnabled()) {
+				LOG.info("String: <{}> arrived from RMQ", value);
+			}
 		}
 		
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 06a1308..465a500 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 4aa7a43..525f4c8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -41,8 +41,8 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 
 /**
- * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
- * It can connect to Twitter Streaming API, collect tweets and 
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. It can connect to Twitter Streaming API, collect tweets and
  */
 public class TwitterSource extends RichSourceFunction<String> {
 
@@ -60,8 +60,10 @@ public class TwitterSource extends RichSourceFunction<String> {
 
 	/**
 	 * Create {@link TwitterSource} for streaming
+	 * 
 	 * @param authPath
-	 * Location of the properties file containing the required authentication information. 
+	 *            Location of the properties file containing the required
+	 *            authentication information.
 	 */
 	public TwitterSource(String authPath) {
 		this.authPath = authPath;
@@ -69,10 +71,11 @@ public class TwitterSource extends RichSourceFunction<String> {
 	}
 
 	/**
-	 * Create {@link TwitterSource} to 
-	 * collect finite number of tweets
+	 * Create {@link TwitterSource} to collect finite number of tweets
+	 * 
 	 * @param authPath
-	 * Location of the properties file containing the required authentication information. 
+	 *            Location of the properties file containing the required
+	 *            authentication information.
 	 * @param numberOfTweets
 	 * 
 	 */
@@ -86,17 +89,17 @@ public class TwitterSource extends RichSourceFunction<String> {
 	public void open(Configuration parameters) throws Exception {
 		initializeConnection();
 	}
-	
+
 	@Override
 	public void invoke(Collector<String> collector) throws Exception {
-		
+
 		if (streaming) {
 			collectMessages(collector);
 		} else {
 			collectFiniteMessages(collector);
 		}
 	}
-	
+
 	@Override
 	public void close() throws Exception {
 		closeConnection();
@@ -136,9 +139,9 @@ public class TwitterSource extends RichSourceFunction<String> {
 	}
 
 	/**
-	 * Reads the given properties file for the authentication data.   
-	 * @return
-	 * the authentication data.
+	 * Reads the given properties file for the authentication data.
+	 * 
+	 * @return the authentication data.
 	 */
 	private Properties loadAuthenticationProperties() {
 		Properties properties = new Properties();
@@ -147,18 +150,15 @@ public class TwitterSource extends RichSourceFunction<String> {
 			properties.load(input);
 			input.close();
 		} catch (IOException ioe) {
-			new RuntimeException("Cannot open .properties file: " + authPath,
-					ioe);
+			new RuntimeException("Cannot open .properties file: " + authPath, ioe);
 		}
 		return properties;
 	}
 
-	private void initializeClient(StatusesSampleEndpoint endpoint,
-			Authentication auth) {
+	private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
 
-		client = new ClientBuilder().name("twitterSourceClient")
-				.hosts(Constants.STREAM_HOST).endpoint(endpoint)
-				.authentication(auth)
+		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+				.endpoint(endpoint).authentication(auth)
 				.processor(new StringDelimitedProcessor(queue)).build();
 
 		client.connect();
@@ -166,8 +166,9 @@ public class TwitterSource extends RichSourceFunction<String> {
 
 	/**
 	 * Put tweets into collector
+	 * 
 	 * @param collector
-	 * Collector in which the tweets are collected.
+	 *            Collector in which the tweets are collected.
 	 */
 	protected void collectFiniteMessages(Collector<String> collector) {
 
@@ -186,8 +187,9 @@ public class TwitterSource extends RichSourceFunction<String> {
 
 	/**
 	 * Put tweets into collector
+	 * 
 	 * @param collector
-	 * Collector in which the tweets are collected.
+	 *            Collector in which the tweets are collected.
 	 */
 	protected void collectMessages(Collector<String> collector) {
 
@@ -202,14 +204,15 @@ public class TwitterSource extends RichSourceFunction<String> {
 
 	/**
 	 * Put one tweet into the collector.
+	 * 
 	 * @param collector
-	 * Collector in which the tweets are collected.
+	 *            Collector in which the tweets are collected.
 	 */
 	protected void collectOneMessage(Collector<String> collector) {
 		if (client.isDone()) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error("Client connection closed unexpectedly: "
-						+ client.getExitEvent().getMessage());
+				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+						.getMessage());
 			}
 		}
 
@@ -219,8 +222,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 				collector.collect(msg);
 			} else {
 				if (LOG.isInfoEnabled()) {
-					LOG.info("Did not receive a message in " + waitSec
-							+ " seconds");
+					LOG.info("Did not receive a message in {} seconds", waitSec);
 				}
 			}
 		} catch (InterruptedException e) {
@@ -243,7 +245,8 @@ public class TwitterSource extends RichSourceFunction<String> {
 	}
 
 	/**
-	 * Get the size of the queue in which the tweets are contained temporarily. 
+	 * Get the size of the queue in which the tweets are contained temporarily.
+	 * 
 	 * @return
 	 */
 	public int getQueueSize() {
@@ -251,18 +254,19 @@ public class TwitterSource extends RichSourceFunction<String> {
 	}
 
 	/**
-	 * Set the size of the queue in which the tweets are contained temporarily. 
+	 * Set the size of the queue in which the tweets are contained temporarily.
+	 * 
 	 * @param queueSize
-	 * The desired value.
+	 *            The desired value.
 	 */
 	public void setQueueSize(int queueSize) {
 		this.queueSize = queueSize;
 	}
-	
+
 	/**
 	 * This function tells how long TwitterSource waits for the tweets.
-	 * @return
-	 * Number of second.
+	 * 
+	 * @return Number of second.
 	 */
 	public int getWaitSec() {
 		return waitSec;
@@ -270,8 +274,9 @@ public class TwitterSource extends RichSourceFunction<String> {
 
 	/**
 	 * This function sets how long TwitterSource should wait for the tweets.
+	 * 
 	 * @param waitSec
-	 * The desired value.
+	 *            The desired value.
 	 */
 	public void setWaitSec(int waitSec) {
 		this.waitSec = waitSec;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index b715d18..5927ce4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.streaming.connectors.twitter;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TwitterStreaming {
 
@@ -33,7 +33,7 @@ public class TwitterStreaming {
 	private static final int SOURCE_PARALLELISM = 1;
 	private static final int NUMBEROFTWEETS = 100;
 
-	private static final Log LOG = LogFactory.getLog(TwitterStreaming.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
 
 	public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b84c852
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+	private String jsonText;
+	private String searchedField;
+
+	public JSONParserTest(String text, String field) {
+		jsonText = text;
+		searchedField = field;
+	}
+
+	@Parameters
+	public static Collection<Object[]> initParameterList() {
+
+		Object[][] parameterList = new Object[][] { 
+				{ "{\"key\":\"value\"}", 							"key" },
+				{ "{\"key\":[\"value\"]}", 							"key[0]" },
+				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
+				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
+				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
+				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
+				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+				};
+
+		return Arrays.asList(parameterList);
+	}
+
+	@Test
+	public void test() {
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+			String expected = "{\"retValue\":\"value\"}";
+
+			assertTrue(expected.equals(jo.toString()));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..6730c25
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+	
+	@Test
+	public void testGetBooleanFunction() {
+		String jsonText = "{\"key\":true}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertTrue(jo.getBoolean("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetDoubleFunction() {
+		double expected = 12345.12345;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getDouble("retValue"),0.000001);
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetIntFunction() {
+		int expected = 15;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getInt("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testGetLongFunction() {
+		long expected = 111111111111L;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getLong("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index cd54a54..4bb022a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -164,17 +164,16 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSource(String componentName,
-			SourceInvokable<?> InvokableObject,
+	public void addSource(String componentName, SourceInvokable<?> InvokableObject,
 			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamSource.class, InvokableObject,
-				operatorName, serializedFunction, parallelism);
+		addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
+				serializedFunction, parallelism);
 		addTypeWrappers(componentName, null, null, outTypeWrapper, null);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE: " + componentName);
+			LOG.debug("SOURCE: {}", componentName);
 		}
 	}
 
@@ -193,11 +192,10 @@ public class JobGraphBuilder {
 	 * @param waitTime
 	 *            Max wait time for next record
 	 */
-	public void addIterationSource(String componentName, String iterationHead,
-			String iterationID, int parallelism, long waitTime) {
+	public void addIterationSource(String componentName, String iterationHead, String iterationID,
+			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamIterationSource.class, null, null,
-				null, parallelism);
+		addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
 
 		iterationIds.put(componentName, iterationID);
 		iterationIDtoSourceName.put(iterationID, componentName);
@@ -205,14 +203,13 @@ public class JobGraphBuilder {
 		setBytesFrom(iterationHead, componentName);
 
 		setEdge(componentName, iterationHead,
-				connectionTypes.get(inEdgeList.get(iterationHead).get(0))
-						.get(0), 0, new ArrayList<String>(), false);
+				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0,
+				new ArrayList<String>(), false);
 
-		iterationWaitTime.put(iterationIDtoSourceName.get(iterationID),
-				waitTime);
+		iterationWaitTime.put(iterationIDtoSourceName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SOURCE: " + componentName);
+			LOG.debug("ITERATION SOURCE: {}", componentName);
 		}
 	}
 
@@ -236,36 +233,32 @@ public class JobGraphBuilder {
 	 */
 	public <IN, OUT> void addTask(String componentName,
 			StreamOperatorInvokable<IN, OUT> taskInvokableObject,
-			TypeSerializerWrapper<?> inTypeWrapper,
-			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			TypeSerializerWrapper<?> inTypeWrapper, TypeSerializerWrapper<?> outTypeWrapper,
+			String operatorName, byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamTask.class, taskInvokableObject,
-				operatorName, serializedFunction, parallelism);
+		addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
+				serializedFunction, parallelism);
 
-		addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper,
-				null);
+		addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper, null);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK: " + componentName);
+			LOG.debug("TASK: {}", componentName);
 		}
 	}
 
 	public <IN1, IN2, OUT> void addCoTask(String componentName,
 			CoInvokable<IN1, IN2, OUT> taskInvokableObject,
-			TypeSerializerWrapper<?> in1TypeWrapper,
-			TypeSerializerWrapper<?> in2TypeWrapper,
+			TypeSerializerWrapper<?> in1TypeWrapper, TypeSerializerWrapper<?> in2TypeWrapper,
 			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, CoStreamTask.class,
-				taskInvokableObject, operatorName, serializedFunction,
-				parallelism);
-		
+		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
+				serializedFunction, parallelism);
+
 		addTypeWrappers(componentName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
-		
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("CO-TASK: " + componentName);
+			LOG.debug("CO-TASK: {}", componentName);
 		}
 	}
 
@@ -284,15 +277,15 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	public void addSink(String componentName, SinkInvokable<?> InvokableObject,
-			TypeSerializerWrapper<?> inTypeWrapper, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			TypeSerializerWrapper<?> inTypeWrapper, String operatorName, byte[] serializedFunction,
+			int parallelism) {
 
-		addComponent(componentName, StreamSink.class,
-				InvokableObject, operatorName, serializedFunction, parallelism);
+		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
+				serializedFunction, parallelism);
 		addTypeWrappers(componentName, inTypeWrapper, null, null, null);
-		
+
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK: " + componentName);
+			LOG.debug("SINK: {}", componentName);
 		}
 
 	}
@@ -315,19 +308,18 @@ public class JobGraphBuilder {
 	 * @param waitTime
 	 *            Max waiting time for next record
 	 */
-	public void addIterationSink(String componentName, String iterationTail,
-			String iterationID, int parallelism, long waitTime) {
+	public void addIterationSink(String componentName, String iterationTail, String iterationID,
+			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamIterationSink.class, null,
-				null, null, parallelism);
+		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
 		iterationIds.put(componentName, iterationID);
 		iterationIDtoSinkName.put(iterationID, componentName);
 		setBytesFrom(iterationTail, componentName);
-		//setInTypeWrappersFrom(iterationTail, componentName);
+		// setInTypeWrappersFrom(iterationTail, componentName);
 		iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SINK: " + componentName);
+			LOG.debug("ITERATION SINK: {}", componentName);
 		}
 
 	}
@@ -351,9 +343,8 @@ public class JobGraphBuilder {
 	 *            Number of parallel instances created
 	 */
 	private void addComponent(String componentName,
-			Class<? extends AbstractInvokable> componentClass,
-			StreamInvokable<?> invokableObject, String operatorName,
-			byte[] serializedFunction, int parallelism) {
+			Class<? extends AbstractInvokable> componentClass, StreamInvokable<?> invokableObject,
+			String operatorName, byte[] serializedFunction, int parallelism) {
 
 		componentClasses.put(componentName, componentClass);
 		setParallelism(componentName, parallelism);
@@ -366,14 +357,13 @@ public class JobGraphBuilder {
 		outEdgeNames.put(componentName, new ArrayList<List<String>>());
 		outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
 		inEdgeList.put(componentName, new ArrayList<String>());
-		connectionTypes.put(componentName,
-				new ArrayList<StreamPartitioner<?>>());
+		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<?>>());
 		iterationTailCount.put(componentName, 0);
 	}
 
-	private void addTypeWrappers(String componentName,
-			TypeSerializerWrapper<?> in1, TypeSerializerWrapper<?> in2,
-			TypeSerializerWrapper<?> out1, TypeSerializerWrapper<?> out2) {
+	private void addTypeWrappers(String componentName, TypeSerializerWrapper<?> in1,
+			TypeSerializerWrapper<?> in2, TypeSerializerWrapper<?> out1,
+			TypeSerializerWrapper<?> out2) {
 		typeWrapperIn1.put(componentName, in1);
 		typeWrapperIn2.put(componentName, in2);
 		typeWrapperOut1.put(componentName, out1);
@@ -390,10 +380,8 @@ public class JobGraphBuilder {
 	private void createVertex(String componentName) {
 
 		// Get vertex attributes
-		Class<? extends AbstractInvokable> componentClass = componentClasses
-				.get(componentName);
-		StreamInvokable<?> invokableObject = invokableObjects
-				.get(componentName);
+		Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
+		StreamInvokable<?> invokableObject = invokableObjects.get(componentName);
 		String operatorName = operatorNames.get(componentName);
 		byte[] serializedFunction = serializedFunctions.get(componentName);
 		int parallelism = componentParallelism.get(componentName);
@@ -417,20 +405,19 @@ public class JobGraphBuilder {
 		component.setInvokableClass(componentClass);
 		component.setNumberOfSubtasks(parallelism);
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Parallelism set: " + parallelism + " for "
-					+ componentName);
+			LOG.debug("Parallelism set: {} for {}", parallelism, componentName);
 		}
 
 		StreamConfig config = new StreamConfig(component.getConfiguration());
 
 		config.setMutability(mutability.get(componentName));
 		config.setBufferTimeout(bufferTimeout.get(componentName));
-		
+
 		config.setTypeWrapperIn1(typeWrapperIn1.get(componentName));
 		config.setTypeWrapperIn2(typeWrapperIn2.get(componentName));
 		config.setTypeWrapperOut1(typeWrapperOut1.get(componentName));
 		config.setTypeWrapperOut2(typeWrapperOut2.get(componentName));
-		
+
 		// Set vertex config
 		config.setUserInvokable(invokableObject);
 		config.setComponentName(componentName);
@@ -486,10 +473,9 @@ public class JobGraphBuilder {
 	 * @param outputNames
 	 *            User defined names of the out edge
 	 */
-	public void setEdge(String upStreamComponentName,
-			String downStreamComponentName,
-			StreamPartitioner<?> partitionerObject, int typeNumber,
-			List<String> outputNames, boolean selectAll) {
+	public void setEdge(String upStreamComponentName, String downStreamComponentName,
+			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames,
+			boolean selectAll) {
 		outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
 		outEdgeType.get(upStreamComponentName).add(typeNumber);
 		inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
@@ -509,45 +495,38 @@ public class JobGraphBuilder {
 	 * @param partitionerObject
 	 *            The partitioner
 	 */
-	private <T> void connect(String upStreamComponentName,
-			String downStreamComponentName,
+	private <T> void connect(String upStreamComponentName, String downStreamComponentName,
 			StreamPartitioner<T> partitionerObject) {
 
-		AbstractJobVertex upStreamComponent = components
-				.get(upStreamComponentName);
-		AbstractJobVertex downStreamComponent = components
-				.get(downStreamComponentName);
+		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
+		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
 
-		StreamConfig config = new StreamConfig(
-				upStreamComponent.getConfiguration());
+		StreamConfig config = new StreamConfig(upStreamComponent.getConfiguration());
 
 		try {
 			if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
-				upStreamComponent.connectTo(downStreamComponent,
-						ChannelType.NETWORK, DistributionPattern.POINTWISE);
+				upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK,
+						DistributionPattern.POINTWISE);
 			} else {
-				upStreamComponent.connectTo(downStreamComponent,
-						ChannelType.NETWORK, DistributionPattern.BIPARTITE);
+				upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK,
+						DistributionPattern.BIPARTITE);
 			}
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("CONNECTED: "
-						+ partitionerObject.getClass().getSimpleName() + " - "
-						+ upStreamComponentName + " -> "
-						+ downStreamComponentName);
+				LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
+						upStreamComponentName, downStreamComponentName);
 			}
 
 		} catch (JobGraphDefinitionException e) {
-			throw new RuntimeException("Cannot connect components: "
-					+ upStreamComponentName + " to " + downStreamComponentName,
-					e);
+			throw new RuntimeException("Cannot connect components: " + upStreamComponentName
+					+ " to " + downStreamComponentName, e);
 		}
 
 		int outputIndex = upStreamComponent.getNumberOfForwardConnections() - 1;
 
-		config.setOutputName(outputIndex,
-				outEdgeNames.get(upStreamComponentName).get(outputIndex));
-		config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
+		config.setOutputName(outputIndex, outEdgeNames.get(upStreamComponentName).get(outputIndex));
+		config.setSelectAll(outputIndex,
+				outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
 		config.setPartitioner(outputIndex, partitionerObject);
 		config.setNumberOfOutputChannels(outputIndex,
 				componentParallelism.get(downStreamComponentName));
@@ -562,12 +541,10 @@ public class JobGraphBuilder {
 	 * @param iterationTail
 	 *            ID of the iteration tail
 	 */
-	public void setIterationSourceSettings(String iterationID,
-			String iterationTail) {
+	public void setIterationSourceSettings(String iterationID, String iterationTail) {
 		setParallelism(iterationIDtoSourceName.get(iterationID),
 				componentParallelism.get(iterationTail));
-		setBufferTimeout(iterationIDtoSourceName.get(iterationID),
-				bufferTimeout.get(iterationTail));
+		setBufferTimeout(iterationIDtoSourceName.get(iterationID), bufferTimeout.get(iterationTail));
 	}
 
 	/**
@@ -580,12 +557,11 @@ public class JobGraphBuilder {
 	 * @param serializedOutputSelector
 	 *            Byte array representing the serialized output selector.
 	 */
-	public <T> void setOutputSelector(String componentName,
-			byte[] serializedOutputSelector) {
+	public <T> void setOutputSelector(String componentName, byte[] serializedOutputSelector) {
 		outputSelectors.put(componentName, serializedOutputSelector);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Outputselector set for " + componentName);
+			LOG.debug("Outputselector set for {}", componentName);
 		}
 
 	}
@@ -613,34 +589,33 @@ public class JobGraphBuilder {
 	}
 
 	public void setInToOutTypeWrappersFrom(String from, String to) {
-		//TODO rename function
+		// TODO rename function
 		typeWrapperIn1.put(to, typeWrapperOut1.get(from));
 		typeWrapperIn2.put(to, typeWrapperOut2.get(from));
 	}
-	
+
 	public void setOutToOutTypeWrappersFrom(String from, String to) {
-		//TODO rename function
+		// TODO rename function
 		typeWrapperOut1.put(to, typeWrapperOut1.get(from));
 		typeWrapperOut2.put(to, typeWrapperOut2.get(from));
 	}
-	
+
 	public void setInToInTypeWrappersFrom(String from, String to) {
-		//TODO rename function
+		// TODO rename function
 		typeWrapperIn1.put(to, typeWrapperIn1.get(from));
 		typeWrapperIn2.put(to, typeWrapperIn2.get(from));
 	}
-	
-	public TypeInformation<?> getInTypeInfo(String id){
-		// TODO 
+
+	public TypeInformation<?> getInTypeInfo(String id) {
+		// TODO
 		System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
 		return typeWrapperIn1.get(id).getTypeInfo();
 	}
-	
-	public TypeInformation<?> getOutTypeInfo(String id){
-		// TODO 
+
+	public TypeInformation<?> getOutTypeInfo(String id) {
+		// TODO
 		return typeWrapperOut1.get(id).getTypeInfo();
 	}
-	
 
 	/**
 	 * Sets instance sharing between the given components
@@ -662,13 +637,11 @@ public class JobGraphBuilder {
 	 */
 	private void setAutomaticInstanceSharing() {
 
-		AbstractJobVertex maxParallelismVertex = components
-				.get(maxParallelismVertexName);
+		AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
 
 		for (String componentName : components.keySet()) {
 			if (!componentName.equals(maxParallelismVertexName)) {
-				components.get(componentName).setVertexToShareInstancesWith(
-						maxParallelismVertex);
+				components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
 			}
 		}
 
@@ -679,9 +652,8 @@ public class JobGraphBuilder {
 	 */
 	private void setNumberOfJobInputs() {
 		for (AbstractJobVertex component : components.values()) {
-			(new StreamConfig(component.getConfiguration()))
-					.setNumberOfInputs(component
-							.getNumberOfBackwardConnections());
+			(new StreamConfig(component.getConfiguration())).setNumberOfInputs(component
+					.getNumberOfBackwardConnections());
 		}
 	}
 
@@ -691,9 +663,8 @@ public class JobGraphBuilder {
 	 */
 	private void setNumberOfJobOutputs() {
 		for (AbstractJobVertex component : components.values()) {
-			(new StreamConfig(component.getConfiguration()))
-					.setNumberOfOutputs(component
-							.getNumberOfForwardConnections());
+			(new StreamConfig(component.getConfiguration())).setNumberOfOutputs(component
+					.getNumberOfForwardConnections());
 		}
 	}
 
@@ -710,19 +681,15 @@ public class JobGraphBuilder {
 		for (String upStreamComponentName : outEdgeList.keySet()) {
 			int i = 0;
 
-			List<Integer> outEdgeTypeList = outEdgeType
-					.get(upStreamComponentName);
+			List<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
 
-			for (String downStreamComponentName : outEdgeList
-					.get(upStreamComponentName)) {
-				StreamConfig downStreamComponentConfig = new StreamConfig(
-						components.get(downStreamComponentName)
-								.getConfiguration());
+			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
+				StreamConfig downStreamComponentConfig = new StreamConfig(components.get(
+						downStreamComponentName).getConfiguration());
 
 				int inputNumber = downStreamComponentConfig.getNumberOfInputs();
 
-				downStreamComponentConfig.setInputType(inputNumber++,
-						outEdgeTypeList.get(i));
+				downStreamComponentConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
 				downStreamComponentConfig.setNumberOfInputs(inputNumber);
 
 				connect(upStreamComponentName, downStreamComponentName,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index ab6caea..42a2683 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
 public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
-	
+
 	OutputSelector<OUT> outputSelector;
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs;
 	private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
@@ -88,9 +88,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 					.get(outputName);
 			if (outputList == null) {
 				if (LOG.isErrorEnabled()) {
-					LOG.error(String.format(
-							"Cannot emit because no output is selected with the name: %s",
-							outputName));
+					LOG.error("Cannot emit because no output is selected with the name: {}",
+							outputName);
 				}
 			}
 
@@ -110,8 +109,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 				}
 			} catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
-					LOG.error(String.format("Emit to %s failed due to: %s", outputName,
-							StringUtils.stringifyException(e)));
+					LOG.error("Emit to {} failed due to: {}", outputName,
+							StringUtils.stringifyException(e));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 54cab72..7f38df1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -59,7 +59,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 	public StreamCollector(int channelID,
 			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
 		this.serializationDelegate = serializationDelegate;
-		
+
 		if (serializationDelegate != null) {
 			this.streamRecord = serializationDelegate.getInstance();
 		} else {
@@ -104,7 +104,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 			}
 		}
 	}
-	
+
 	/**
 	 * Collects and emits a tuple/object to the outputs by reusing a
 	 * StreamRecord object.
@@ -129,15 +129,14 @@ public class StreamCollector<OUT> implements Collector<OUT> {
 		serializationDelegate.setInstance(streamRecord);
 		emitToOutputs();
 	}
-	
+
 	protected void emitToOutputs() {
 		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
 			try {
 				output.emit(serializationDelegate);
 			} catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
-					LOG.error(String.format("Emit failed due to: %s",
-							StringUtils.stringifyException(e)));
+					LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index b2fcf89..b20e832 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -71,7 +71,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	@Override
 	public void execute() {
 		if (LOG.isInfoEnabled()) {
-			LOG.info("Running remotely at " + host + ":" + port);
+			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
index 799f647..558d11c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -102,8 +102,8 @@ public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<O
 			callUserFunction();
 		} catch (Exception e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error(String.format("Calling user function failed due to: %s",
-						StringUtils.stringifyException(e)));
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index c21e784..b064df7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -26,6 +24,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 
@@ -34,7 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	}
 
 	private static final long serialVersionUID = 1L;
-	private static final Log LOG = LogFactory.getLog(CoInvokable.class);
+	private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
 
 	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
 	protected StreamRecord<IN1> reuse1;
@@ -120,8 +120,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			callUserFunction1();
 		} catch (Exception e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error(String.format("Calling user function failed due to: %s",
-						StringUtils.stringifyException(e)));
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
 			}
 		}
 	}
@@ -131,8 +131,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			callUserFunction2();
 		} catch (Exception e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error(String.format("Calling user function failed due to: %s",
-						StringUtils.stringifyException(e)));
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 76277dc..7382d7d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -35,9 +33,11 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.types.TypeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class OutputHandler<OUT> {
-	private static final Log LOG = LogFactory.getLog(OutputHandler.class);
+	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
 
 	private AbstractStreamComponent streamComponent;
 	private StreamConfig configuration;
@@ -128,14 +128,14 @@ public class OutputHandler<OUT> {
 		outputs.add(output);
 		List<String> outputName = configuration.getOutputName(outputNumber);
 		boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
-		
+
 		if (collector != null) {
 			collector.addOutput(output, outputName, isSelectAllOutput);
 		}
 
 		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: " + outputPartitioner.getClass().getSimpleName() + " with "
-					+ outputNumber + " outputs");
+			LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
+					.getSimpleName(), outputNumber);
 		}
 	}
 
@@ -153,11 +153,11 @@ public class OutputHandler<OUT> {
 
 	long startTime;
 
-	public void invokeUserFunction(String componentTypeName,
-			StreamInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+	public void invokeUserFunction(String componentTypeName, StreamInvokable<OUT> userInvokable)
+			throws IOException, InterruptedException {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(componentTypeName + " " + streamComponent.getName()
-					+ " invoked with instance id " + streamComponent.getInstanceID());
+			LOG.debug("{} {} invoked with instance id {}", componentTypeName,
+					streamComponent.getName(), streamComponent.getInstanceID());
 		}
 
 		initializeOutputSerializers();
@@ -170,8 +170,8 @@ public class OutputHandler<OUT> {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(componentTypeName + " " + streamComponent.getName()
-					+ " invoke finished with instance id " + streamComponent.getInstanceID());
+			LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
+					streamComponent.getName(), streamComponent.getInstanceID());
 		}
 
 		flushOutputs();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 8a1a637..40c4b96 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -27,13 +27,12 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.apache.flink.util.StringUtils;
 
-public class StreamIterationSink<IN extends Tuple> extends
-		AbstractStreamComponent {
+public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
 
 	private InputHandler<IN> inputHandler;
-	
+
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
@@ -61,13 +60,13 @@ public class StreamIterationSink<IN extends Tuple> extends
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + getName() + " invoked");
+			LOG.debug("SINK {} invoked", getName());
 		}
 
 		forwardRecords();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + getName() + " invoke finished");
+			LOG.debug("SINK {} invoke finished", getName());
 		}
 	}
 
@@ -92,8 +91,8 @@ public class StreamIterationSink<IN extends Tuple> extends
 			}
 		} catch (InterruptedException e) {
 			if (LOG.isErrorEnabled()) {
-				LOG.error(String.format("Pushing back record at iteration %s failed due to: %s",
-						iterationId, StringUtils.stringifyException(e)));
+				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+						StringUtils.stringifyException(e));
 			}
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index ab02d84..3affc8c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -68,7 +68,7 @@ public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComp
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE " + getName() + " invoked with instance id " + getInstanceID());
+			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
 		}
 
 		outputHandler.initializeOutputSerializers();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 0797cc1..2754c89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -48,13 +48,13 @@ public class StreamSink<IN> extends AbstractStreamComponent {
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + getName() + " invoked");
+			LOG.debug("SINK {} invoked", getName());
 		}
 
 		invokeUserFunction(userInvokable);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK " + getName() + " invoke finished");
+			LOG.debug("SINK {} invoke finished", getName());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 53eafaa..52407bc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -66,7 +66,7 @@ public class TestDataUtil {
 
 		if (file.exists()) {
 			if (LOG.isInfoEnabled()) {
-				LOG.info(fileName + " already exists.");
+				LOG.info("{} already exists.", fileName);
 			}
 
 			try {
@@ -93,8 +93,10 @@ public class TestDataUtil {
 	}
 
 	public static void download(String fileName) {
-		LOG.info("downloading " + fileName);
-
+		if (LOG.isInfoEnabled()) {
+			LOG.info("downloading {}", fileName);
+		}
+		
 		try {
 			URL website = new URL(testRepoUrl + fileName);
 			BufferedReader bReader = new BufferedReader(new InputStreamReader(website.openStream()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index a06ceca..9c9f00d 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -42,6 +42,12 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-connectors</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0dd5138/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
deleted file mode 100644
index d39996f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/function/JSONParseFlatMap.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.function;
-
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
- * 
- * @param <IN>
- *            Type of the input elements.
- * @param <OUT>
- *            Type of the returned elements.
- */
-public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
-
-	/**
-	 * Get the value object associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public Object get(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).get("retValue");
-	}
-
-	/**
-	 * Get the boolean value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public boolean getBoolean(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getBoolean("retValue");
-	}
-
-	/**
-	 * Get the double value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public double getDouble(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getDouble("retValue");
-	}
-
-	/**
-	 * Get the int value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public int getInt(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getInt("retValue");
-	}
-
-	/**
-	 * Get the long value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public long getLong(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getLong("retValue");
-	}
-	
-	/**
-	 * Get the String value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public String getString(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-		
-		return parser.parse(field).getString("retValue");
-	}
-}