You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/22 18:35:25 UTC

[1/4] flink git commit: [FLINK-1688] [streaming] Socket client sink added

Repository: flink
Updated Branches:
  refs/heads/master 244e5d5f8 -> 35f34162a


[FLINK-1688] [streaming] Socket client sink added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3bc7852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3bc7852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3bc7852

Branch: refs/heads/master
Commit: a3bc785232d3071ac5cb76ab20db9c30a3d2a22a
Parents: 244e5d5
Author: miguel0afd <ma...@stratio.com>
Authored: Sat Mar 14 23:54:38 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 08:46:23 2015 +0100

----------------------------------------------------------------------
 .../connectors/socket/SocketClientSink.java     | 141 +++++++++++++++++++
 1 file changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3bc7852/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
new file mode 100644
index 0000000..72ba4f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+    /**
+     * Class logger
+     */
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final String hostName;
+	private final int port;
+	private final SerializationSchema<IN, byte[]> scheme;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+
+    /**
+     * Default constructor.
+     *
+     * @param hostName Host of the Socket server.
+     * @param port Port of the Socket.
+     * @param schema Schema of the data.
+     */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+		this.hostName = hostName;
+		this.port = port;
+		this.scheme = schema;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void intializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostName, port);
+			outputStream = client.getOutputStream();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		byte[] msg = scheme.serialize(value);
+		try {
+			dataOutputStream.write(msg);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostName + ":" + port, e);
+		} finally {
+            if (client != null) {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    LOG.error("Cannot close connection with socket server at "
+                            + hostName + ":" + port, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Initialize the connection with the Socket in the server.
+     * @param parameters Configuration.
+     */
+	@Override
+	public void open(Configuration parameters) {
+		intializeConnection();
+	}
+
+    /**
+     * Closes the connection with the Socket server.
+     */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+    /**
+     * Closes the connection with the Socket server.
+     */
+	@Override
+	public void cancel() {
+		close();
+	}
+
+}


[2/4] flink git commit: [FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API

Posted by mb...@apache.org.
[FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API

Moved Serialization Schemas from connectors to core
Minor cleanups for Socket Client Sink
This closes #484


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11796495
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11796495
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11796495

Branch: refs/heads/master
Commit: 11796495762893f2c8b1f744321d0c4b5db0b881
Parents: a3bc785
Author: mbalassi <mb...@apache.org>
Authored: Sun Mar 22 10:27:30 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 12:11:05 2015 +0100

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |   2 +-
 .../streaming/connectors/ConnectorSource.java   |   2 +-
 .../streaming/connectors/flume/FlumeSink.java   |   8 +-
 .../streaming/connectors/flume/FlumeSource.java |   2 +-
 .../connectors/flume/FlumeTopology.java         |   4 +-
 .../connectors/kafka/KafkaConsumerExample.java  |   2 +-
 .../connectors/kafka/KafkaProducerExample.java  |   2 +-
 .../kafka/KafkaSimpleConsumerExample.java       |   2 +-
 .../connectors/kafka/api/KafkaSink.java         |  10 +-
 .../connectors/kafka/api/KafkaSource.java       |   2 +-
 .../kafka/api/simple/KafkaTopicUtils.java       |   2 -
 .../kafka/api/simple/PersistentKafkaSource.java |   2 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   8 +-
 .../connectors/rabbitmq/RMQSource.java          |   2 +-
 .../connectors/rabbitmq/RMQTopology.java        |   4 +-
 .../connectors/socket/SocketClientSink.java     | 141 -------------------
 .../connectors/util/DeserializationSchema.java  |  42 ------
 .../util/JavaDefaultStringSchema.java           |  41 ------
 .../streaming/connectors/util/RawSchema.java    |  39 -----
 .../connectors/util/SerializationSchema.java    |  33 -----
 .../connectors/util/SimpleStringSchema.java     |  40 ------
 .../streaming/connectors/kafka/KafkaITCase.java |   2 +-
 .../streaming/api/datastream/DataStream.java    |  16 +++
 .../api/function/sink/SocketClientSink.java     | 138 ++++++++++++++++++
 .../serialization/DeserializationSchema.java    |  42 ++++++
 .../serialization/JavaDefaultStringSchema.java  |  41 ++++++
 .../streaming/util/serialization/RawSchema.java |  39 +++++
 .../util/serialization/SerializationSchema.java |  33 +++++
 .../util/serialization/SimpleStringSchema.java  |  40 ++++++
 .../flink/streaming/api/scala/DataStream.scala  |   8 ++
 30 files changed, 384 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 4408ba0..8155100 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -35,7 +35,7 @@ under the License.
 
 	<packaging>jar</packaging>
 
-	<!-- Allow users to pass custom kafka versions -->
+	<!-- Allow users to pass custom connector versions -->
 	<properties>
 		<kafka.version>0.8.2.0</kafka.version>
 		<rabbitmq.version>3.3.1</rabbitmq.version>

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1623943..a7b0b06 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
 		GenericSourceFunction<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 86fd1b1..27074ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.flume;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -39,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	boolean initDone = false;
 	String host;
 	int port;
-	SerializationSchema<IN, byte[]> scheme;
+	SerializationSchema<IN, byte[]> schema;
 
 	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
 		this.host = host;
 		this.port = port;
-		this.scheme = schema;
+		this.schema = schema;
 	}
 
 	/**
@@ -57,7 +57,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 
-		byte[] data = scheme.serialize(value);
+		byte[] data = schema.serialize(value);
 		client.sendDataToFlume(data);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 2a321a2..00661ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3cfd7d4..7c979d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 public class FlumeTopology {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index dd1221d..6a95b0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 1fe759a..e7abf11 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 
 public class KafkaProducerExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 47c5a33..2fd31ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 
 public class KafkaSimpleConsumerExample {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 1753561..be9eb57 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
 
 import com.google.common.base.Preconditions;
@@ -50,7 +50,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private Properties props;
 	private String topicId;
 	private String zookeeperAddress;
-	private SerializationSchema<IN, byte[]> scheme;
+	private SerializationSchema<IN, byte[]> schema;
 	private SerializableKafkaPartitioner partitioner;
 	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
 
@@ -91,7 +91,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
-		this.scheme = serializationSchema;
+		this.schema = serializationSchema;
 		this.partitioner = partitioner;
 	}
 
@@ -103,7 +103,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
-		this.scheme = serializationSchema;
+		this.schema = serializationSchema;
 		this.partitionerClass = partitioner;
 	}
 
@@ -150,7 +150,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 */
 	@Override
 	public void invoke(IN next) {
-		byte[] serialized = scheme.serialize(next);
+		byte[] serialized = schema.serialize(next);
 		producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4eff870..ae6c169 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -31,7 +31,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 7a10ed3..9e09ea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -82,8 +82,6 @@ public class KafkaTopicUtils {
 
 		Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
 
-		// TODO for Kafka version 8.2.0
-		//		return leader.connectionString();
 		return leader.connectionString();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 8ec298a..97225dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index dae9c6d..48f5e60 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
 	private transient Channel channel;
-	private SerializationSchema<IN, byte[]> scheme;
+	private SerializationSchema<IN, byte[]> schema;
 
 	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
-		this.scheme = schema;
+		this.schema = schema;
 	}
 
 	/**
@@ -72,7 +72,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 		try {
-			byte[] msg = scheme.serialize(value);
+			byte[] msg = schema.serialize(value);
 
 			channel.basicPublish("", QUEUE_NAME, null, msg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 12ad3d6..03b6d10 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index a6ca9ae..0f06235 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 public class RMQTopology {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
deleted file mode 100644
index 72ba4f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.socket;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-    /**
-     * Class logger
-     */
-	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
-	private final String hostName;
-	private final int port;
-	private final SerializationSchema<IN, byte[]> scheme;
-	private transient Socket client;
-	private transient DataOutputStream dataOutputStream;
-
-    /**
-     * Default constructor.
-     *
-     * @param hostName Host of the Socket server.
-     * @param port Port of the Socket.
-     * @param schema Schema of the data.
-     */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
-		this.hostName = hostName;
-		this.port = port;
-		this.scheme = schema;
-	}
-
-	/**
-	 * Initializes the connection to Socket.
-	 */
-	public void intializeConnection() {
-		OutputStream outputStream;
-		try {
-			client = new Socket(hostName, port);
-			outputStream = client.getOutputStream();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		dataOutputStream = new DataOutputStream(outputStream);
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Socket.
-	 *
-	 * @param value
-	 *			The incoming data
-	 */
-	@Override
-	public void invoke(IN value) {
-		byte[] msg = scheme.serialize(value);
-		try {
-			dataOutputStream.write(msg);
-		} catch (IOException e) {
-			if(LOG.isErrorEnabled()){
-				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
-			}
-		}
-	}
-
-	/**
-	 * Closes the connection of the Socket client.
-	 */
-	private void closeConnection(){
-		try {
-			client.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing connection with socket server at "
-					+ hostName + ":" + port, e);
-		} finally {
-            if (client != null) {
-                try {
-                    client.close();
-                } catch (IOException e) {
-                    LOG.error("Cannot close connection with socket server at "
-                            + hostName + ":" + port, e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Initialize the connection with the Socket in the server.
-     * @param parameters Configuration.
-     */
-	@Override
-	public void open(Configuration parameters) {
-		intializeConnection();
-	}
-
-    /**
-     * Closes the connection with the Socket server.
-     */
-	@Override
-	public void close() {
-		closeConnection();
-	}
-
-    /**
-     * Closes the connection with the Socket server.
-     */
-	@Override
-	public void cancel() {
-		close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
deleted file mode 100644
index 4507a1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationSchema<T> extends Serializable {
-
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public T deserialize(byte[] message);
-
-	/**
-	 * Method to decide whether the element signals the end of the stream. If
-	 * true is returned the element won't be emitted
-	 * 
-	 * @param nextElement
-	 *            The element to test for end signal
-	 * @return The end signal, if true the stream shuts down
-	 */
-	public boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
deleted file mode 100644
index 569d3e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import org.apache.commons.lang3.SerializationUtils;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return nextElement.equals("q");
-	}
-
-	@Override
-	public byte[] serialize(String element) {
-		return SerializationUtils.serialize(element);
-	}
-
-	@Override
-	public String deserialize(byte[] message) {
-		return SerializationUtils.deserialize(message);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
deleted file mode 100644
index 29c749a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawSchema implements DeserializationSchema<byte[]>,
-		SerializationSchema<byte[], byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public byte[] deserialize(byte[] message) {
-		return message;
-	}
-
-	@Override
-	public boolean isEndOfStream(byte[] nextElement) {
-		return false;
-	}
-
-	@Override
-	public byte[] serialize(byte[] element) {
-		return element;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
deleted file mode 100644
index f8d2b2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationSchema<T, R> extends Serializable {
-
-	/**
-	 * Serializes the incoming element to a specified type.
-	 * 
-	 * @param element
-	 *            The incoming element to be serialized
-	 * @return The serialized element.
-	 */
-	public R serialize(T element);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
deleted file mode 100644
index 4b21580..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
-		SerializationSchema<String, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public String deserialize(byte[] message) {
-		return new String(message);
-	}
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return false;
-	}
-
-	@Override
-	public String serialize(String element) {
-		return element;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 134525d..ab786ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b62a6d8..5768101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SocketClientSink;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -80,6 +81,7 @@ import org.apache.flink.streaming.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -1115,6 +1117,20 @@ public class DataStream<OUT> {
 		return writeToFile((OutputFormat<OUT>) of, millis);
 	}
 
+	/**
+	 * Writes the DataStream to a socket as a byte array. The format of the output is
+	 * specified by a {@link SerializationSchema}.
+	 *
+	 * @param hostName host of the socket
+	 * @param port port of the socket
+	 * @param schema schema for serialization
+	 * @return the closed DataStream
+	 */
+	public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+		DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
+		return returnStream;
+	}
+
 	private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
 		DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
new file mode 100644
index 0000000..6ebcf46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final String hostName;
+	private final int port;
+	private final SerializationSchema<IN, byte[]> schema;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+
+	/**
+	 * Default constructor.
+	 *
+	 * @param hostName Host of the Socket server.
+	 * @param port Port of the Socket.
+	 * @param schema Schema of the data.
+	 */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+		this.hostName = hostName;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void intializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostName, port);
+			outputStream = client.getOutputStream();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		byte[] msg = schema.serialize(value);
+		try {
+			dataOutputStream.write(msg);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			dataOutputStream.flush();
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostName + ":" + port, e);
+		} finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (IOException e) {
+					LOG.error("Cannot close connection with socket server at "
+							+ hostName + ":" + port, e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) {
+		intializeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void cancel() {
+		close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
new file mode 100644
index 0000000..08ef461
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public T deserialize(byte[] message);
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted
+	 * 
+	 * @param nextElement
+	 *            The element to test for end signal
+	 * @return The end signal, if true the stream shuts down
+	 */
+	public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
new file mode 100644
index 0000000..93d13ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return nextElement.equals("q");
+	}
+
+	@Override
+	public byte[] serialize(String element) {
+		return SerializationUtils.serialize(element);
+	}
+
+	@Override
+	public String deserialize(byte[] message) {
+		return SerializationUtils.deserialize(message);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
new file mode 100644
index 0000000..e457bef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+		SerializationSchema<byte[], byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public byte[] deserialize(byte[] message) {
+		return message;
+	}
+
+	@Override
+	public boolean isEndOfStream(byte[] nextElement) {
+		return false;
+	}
+
+	@Override
+	public byte[] serialize(byte[] element) {
+		return element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
new file mode 100644
index 0000000..8124eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T, R> extends Serializable {
+
+	/**
+	 * Serializes the incoming element to a specified type.
+	 * 
+	 * @param element
+	 *            The incoming element to be serialized
+	 * @return The serialized element.
+	 */
+	public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000..3d0a0d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+		SerializationSchema<String, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return false;
+	}
+
+	@Override
+	public String serialize(String element) {
+		return element;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3dc54d6..7d77b5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -605,6 +606,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.writeAsCsv(path, millis)
 
   /**
+   * Writes the DataStream to a socket as a byte array. The format of the output is
+   * specified by a {@link SerializationSchema}.
+   */
+  def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
+    DataStream[T] = javaStream.writeToSocket(hostname, port, schema)
+
+  /**
    * Adds the given sink to this DataStream. Only streams with sinks added
    * will be executed once the StreamExecutionEnvironment.execute(...)
    * method is called.


[4/4] flink git commit: [FLINK-1763] [streaming] Remove cancel from SinkFunction

Posted by mb...@apache.org.
[FLINK-1763] [streaming] Remove cancel from SinkFunction

This closes #513


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f34162
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f34162
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f34162

Branch: refs/heads/master
Commit: 35f34162a1ba3bf3fd590d7b0e59011ad9f6160f
Parents: 2842e2f
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Mar 22 13:44:27 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 16:44:09 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |  7 ----
 .../connectors/kafka/api/KafkaSink.java         |  6 +--
 .../connectors/kafka/api/KafkaSource.java       |  1 +
 .../api/simple/offset/BeginningOffset.java      |  2 +
 .../kafka/api/simple/offset/CurrentOffset.java  |  2 +
 .../kafka/api/simple/offset/GivenOffset.java    |  1 +
 .../kafka/api/simple/offset/KafkaOffset.java    |  2 +
 .../streaming/connectors/rabbitmq/RMQSink.java  |  5 ---
 .../connectors/twitter/TwitterStreaming.java    |  4 --
 .../streaming/connectors/kafka/KafkaITCase.java |  7 ++--
 .../api/function/sink/FileSinkFunction.java     |  9 ----
 .../api/function/sink/PrintSinkFunction.java    |  6 ---
 .../api/function/sink/SinkFunction.java         |  7 ----
 .../api/function/sink/SocketClientSink.java     |  8 ----
 .../sink/WriteSinkFunctionByMillis.java         |  5 ---
 .../streaming/api/invokable/SinkInvokable.java  |  7 ----
 .../api/streamvertex/StreamVertex.java          | 43 +++++++++++++++-----
 .../streaming/io/SpillingBufferOrEvent.java     |  1 +
 .../apache/flink/streaming/api/IterateTest.java |  5 ---
 .../api/collector/DirectedOutputTest.java       |  6 +--
 .../windowing/WindowIntegrationTest.java        | 40 ------------------
 .../api/streamvertex/StreamVertexTest.java      | 10 +----
 .../streaming/util/TestListResultSink.java      |  6 ---
 .../flink/streaming/api/scala/DataStream.scala  |  1 -
 .../StreamCheckpointingITCase.java              |  4 +-
 .../test/classloading/jar/StreamingProgram.java |  4 --
 26 files changed, 50 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 27074ec..8112159 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,13 +134,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	@Override
-	public void cancel() {
-		if (client != null) {
-			client.client.close();
-		}
-	}
-
-	@Override
 	public void open(Configuration config) {
 		client = new FlinkRpcClientFacade();
 		client.init(host, port);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index be9eb57..f1dbc8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -65,6 +65,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public KafkaSink(String zookeeperAddress, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema) {
 		this(zookeeperAddress, topicId, serializationSchema, (Class) null);
@@ -161,9 +162,4 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		}
 	}
 
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index ae6c169..1aa834d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -46,6 +46,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
+	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
 	private final String zookeeperAddress;

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
index f7096ad..15e7b36 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class BeginningOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
 		return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
index 3555ff9..6119f32 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java
@@ -22,6 +22,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class CurrentOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
 		return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName);

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
index 1282125..fef6325 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
@@ -21,6 +21,7 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public class GivenOffset extends KafkaOffset {
 
+	private static final long serialVersionUID = 1L;
 	private final long offset;
 
 	public GivenOffset(long offset) {

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index c048ba1..4dfd314 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -28,6 +28,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public abstract class KafkaOffset implements Serializable {
 
+	private static final long serialVersionUID = 1L;
+
 	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
 			String clientName);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 48f5e60..53db1c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,9 +108,4 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		closeChannel();
 	}
 
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 9be27eb..a32fe1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,10 +47,6 @@ public class TwitterStreaming {
 			System.out.println("");
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	public static class SelectDataFlatMap extends

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index ab786ee..a094b89 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -154,14 +154,11 @@ public class KafkaITCase {
 					throw new SuccessException();
 				}
 			}
-
-			@Override
-			public void cancel() {
-			}
 		});
 
 		// add producing topology
 		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+			private static final long serialVersionUID = 1L;
 			boolean running = true;
 
 			@Override
@@ -247,6 +244,8 @@ public class KafkaITCase {
 
 	public static class SuccessException extends Exception {
 
+		private static final long serialVersionUID = 1L;
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 5468494..24beba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,13 +115,4 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
 	 */
 	protected abstract void resetParameters();
 
-	@Override
-	public void cancel() {
-		try {
-			close();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 0fa37ac..9ff8a7f 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -93,10 +93,4 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 	public String toString() {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
-
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index ffa5a67..d4ce24e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -35,11 +35,4 @@ public interface SinkFunction<IN> extends Function, Serializable {
 	 * @throws Exception
 	 */
 	public void invoke(IN value) throws Exception;
-
-	/**
-	 * In case another vertex in topology fails this method is called before terminating
-	 * the sink. Make sure to free up any allocated resources here.
-	 */
-	public void cancel();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
index 6ebcf46..c582c1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -127,12 +127,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		closeConnection();
 	}
 
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void cancel() {
-		close();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index 53030f4..ee6df94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,9 +47,4 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
 		lastTime = System.currentTimeMillis();
 	}
 
-	@Override
-	public void cancel() {
-		// No cleanup needed
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 01a295a..29d2ed2 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -40,11 +40,4 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 	protected void callUserFunction() throws Exception {
 		sinkFunction.invoke(nextObject);
 	}
-
-	@Override
-	public void cancel() {
-		super.cancel();
-		sinkFunction.cancel();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index dd8a463..ae2ebdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -29,9 +29,9 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.jobmanager.BarrierAck;
 import org.apache.flink.runtime.jobmanager.StateBarrierAck;
 import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -152,32 +152,36 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	@Override
 	public void invoke() throws Exception {
 
+		boolean operatorOpen = false;
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
 		}
 
 		try {
 			userInvokable.setRuntimeContext(context);
-			userInvokable.open(getTaskConfiguration());
 
-			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-				invokable.setRuntimeContext(context);
-				invokable.open(getTaskConfiguration());
-			}
+			operatorOpen = true;
+			openOperator();
 
 			userInvokable.invoke();
 
-			userInvokable.close();
-
-			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-				invokable.close();
-			}
+			closeOperator();
+			operatorOpen = false;
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
 			}
 
 		} catch (Exception e) {
+
+			if (operatorOpen) {
+				try {
+					closeOperator();
+				} catch (Throwable t) {
+				}
+			}
+
 			if (LOG.isErrorEnabled()) {
 				LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
 			}
@@ -190,6 +194,23 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 
 	}
 
+	protected void openOperator() throws Exception {
+		userInvokable.open(getTaskConfiguration());
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.setRuntimeContext(context);
+			invokable.open(getTaskConfiguration());
+		}
+	}
+
+	protected void closeOperator() throws Exception {
+		userInvokable.close();
+
+		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+			invokable.close();
+		}
+	}
+
 	protected void clearBuffers() throws IOException {
 		if (outputHandler != null) {
 			outputHandler.clearWriters();

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
index 40713e2..38cc0c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
@@ -40,6 +40,7 @@ public class SpillingBufferOrEvent {
 			throws IOException {
 
 		this.boe = boe;
+		this.channelIndex = boe.getChannelIndex();
 		this.spillReader = reader;
 
 		if (shouldSpill()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 92d23aa..3f0c48a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -68,11 +68,6 @@ public class IterateTest {
 		@Override
 		public void invoke(Boolean tuple) {
 		}
-
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 62872fd..bc7fe73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,9 +88,6 @@ public class DirectedOutputTest {
 			this.list = outputs.get(name);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
@@ -115,6 +112,7 @@ public class DirectedOutputTest {
 		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
 				evenAndOddSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				allSink.getSortedResult());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 34986c8..f0b5500 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -286,10 +286,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -303,10 +299,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -320,10 +312,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -337,10 +325,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -354,10 +338,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -371,10 +351,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -388,10 +364,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -405,10 +377,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -422,10 +390,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 	@SuppressWarnings("serial")
@@ -439,10 +403,6 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index a88a60d..0c3ff83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -83,9 +83,6 @@ public class StreamVertexTest {
 			data.put(k, v);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	@SuppressWarnings("unused")
@@ -134,13 +131,13 @@ public class StreamVertexTest {
 
 		@Override
 		public String map1(String value) {
-//			System.out.println(value);
+			// System.out.println(value);
 			return value;
 		}
 
 		@Override
 		public String map2(Long value) {
-//			System.out.println(value);
+			// System.out.println(value);
 			return value.toString();
 		}
 	}
@@ -154,9 +151,6 @@ public class StreamVertexTest {
 			result.add(value);
 		}
 
-		@Override
-		public void cancel() {
-		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
index 8b78a42..87f290f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.util;
 
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeSet;
@@ -72,9 +71,4 @@ public class TestListResultSink<T> extends RichSinkFunction<T> {
 			return sortedList;
 		}
 	}
-
-	@Override
-	public void cancel() {
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7d77b5b..59f86b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -634,7 +634,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val sinkFunction = new SinkFunction[T] {
       val cleanFun = clean(fun)
       def invoke(in: T) = cleanFun(in)
-      def cancel() = {}
     }
     this.addSink(sinkFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 3accb11..12c6b41 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -178,9 +178,7 @@ public class StreamCheckpointingITCase {
 							}
 						}
 
-						@Override
-						public void cancel() {}
-					});
+			});
 
 			env.execute();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/35f34162/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 18b52c5..9a244a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -98,9 +98,5 @@ public class StreamingProgram {
 		@Override
 		public void invoke(Word value) throws Exception {
 		}
-
-		@Override
-		public void cancel() {
-		}
 	}
 }


[3/4] flink git commit: [FLINK-1756] [streaming] Rename Stream Monitoring to Stream Checkpointing

Posted by mb...@apache.org.
[FLINK-1756] [streaming] Rename Stream Monitoring to Stream Checkpointing

Also set the default checkpoint interval to 5 secs instead of 10.

This closes #506


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2842e2fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2842e2fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2842e2fd

Branch: refs/heads/master
Commit: 2842e2fd9893add426c42aa40436b898e40a1ffc
Parents: 1179649
Author: mbalassi <mb...@apache.org>
Authored: Fri Mar 20 16:06:56 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Sun Mar 22 16:35:06 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 30 ++++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   |  8 +++---
 .../apache/flink/streaming/api/StreamGraph.java | 20 ++++++-------
 .../api/StreamingJobGraphGenerator.java         |  6 ++--
 .../environment/StreamExecutionEnvironment.java | 10 +++----
 .../api/scala/StreamExecutionEnvironment.scala  |  8 +++---
 6 files changed, 41 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 46ec445..91d01a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,11 +178,11 @@ public class ExecutionGraph implements Serializable {
 
 	private ActorContext parentContext;
 
-	private  ActorRef stateMonitorActor;
+	private  ActorRef stateCheckpointerActor;
 
-	private boolean monitoringEnabled;
+	private boolean checkpointingEnabled;
 
-	private long monitoringInterval = 10000;
+	private long checkpointingInterval = 5000;
 
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
 		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
@@ -223,12 +223,12 @@ public class ExecutionGraph implements Serializable {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void setStateMonitorActor(ActorRef stateMonitorActor) {
-		this.stateMonitorActor = stateMonitorActor;
+	public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) {
+		this.stateCheckpointerActor = stateCheckpointerActor;
 	}
 
-	public ActorRef getStateMonitorActor() {
-		return stateMonitorActor;
+	public ActorRef getStateCheckpointerActor() {
+		return stateCheckpointerActor;
 	}
 
 	public void setParentContext(ActorContext parentContext) {
@@ -289,12 +289,12 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	public void setMonitoringEnabled(boolean monitoringEnabled) {
-		this.monitoringEnabled = monitoringEnabled;
+	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+		this.checkpointingEnabled = checkpointingEnabled;
 	}
 
-	public void setMonitoringInterval(long  monitoringInterval) {
-		this.monitoringInterval = monitoringInterval;
+	public void setCheckpointingInterval(long checkpointingInterval) {
+		this.checkpointingInterval = checkpointingInterval;
 	}
 
 	/**
@@ -451,9 +451,9 @@ public class ExecutionGraph implements Serializable {
 					throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
 			}
 
-			if (monitoringEnabled) {
-				stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this,
-						Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
+			if (checkpointingEnabled) {
+				stateCheckpointerActor = StreamCheckpointCoordinator.spawn(parentContext, this,
+						Duration.create(checkpointingInterval, TimeUnit.MILLISECONDS));
 			}
 		}
 		else {
@@ -777,6 +777,6 @@ public class ExecutionGraph implements Serializable {
 		
 		scheduler = null;
 		parentContext = null;
-		stateMonitorActor = null;
+		stateCheckpointerActor = null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 70a1cbb..61a0aea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -311,13 +311,13 @@ class JobManager(val configuration: Configuration,
     case msg: BarrierAck =>
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          jobExecution._1.getStateMonitorActor forward  msg
+          jobExecution._1.getStateCheckpointerActor forward  msg
         case None =>
       }
     case msg: StateBarrierAck =>
       currentJobs.get(msg.jobID) match {
         case Some(jobExecution) =>
-          jobExecution._1.getStateMonitorActor forward  msg
+          jobExecution._1.getStateCheckpointerActor forward  msg
         case None =>
       }
       
@@ -487,8 +487,8 @@ class JobManager(val configuration: Configuration,
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
         
-        executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
-        executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
+        executionGraph.setCheckpointingEnabled(jobGraph.isMonitoringEnabled)
+        executionGraph.setCheckpointingInterval(jobGraph.getMonitorInterval)
 
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 71706bc..970ce49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -92,9 +92,9 @@ public class StreamGraph extends StreamingPlan {
 
 	private ExecutionConfig executionConfig;
 	
-	private boolean monitoringEnabled;
+	private boolean checkpointingEnabled;
 	
-	private long monitoringInterval = 10000;
+	private long checkpointingInterval = 5000;
 
 	public StreamGraph(ExecutionConfig executionConfig) {
 
@@ -559,20 +559,20 @@ public class StreamGraph extends StreamingPlan {
 		return executionConfig;
 	}
 
-	public void setMonitoringEnabled(boolean monitoringEnabled) {
-		this.monitoringEnabled = monitoringEnabled;
+	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+		this.checkpointingEnabled = checkpointingEnabled;
 	}
 
-	public boolean isMonitoringEnabled() {
-		return monitoringEnabled;
+	public boolean isCheckpointingEnabled() {
+		return checkpointingEnabled;
 	}
 
-	public void setMonitoringInterval(long monitoringInterval) {
-		this.monitoringInterval = monitoringInterval;
+	public void setCheckpointingInterval(long checkpointingInterval) {
+		this.checkpointingInterval = checkpointingInterval;
 	}
 
-	public long getMonitoringInterval() {
-		return monitoringInterval;
+	public long getCheckpointingInterval() {
+		return checkpointingInterval;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 544ccc6..ad744d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -78,8 +78,8 @@ public class StreamingJobGraphGenerator {
 		// Turn lazy scheduling off
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
 		jobGraph.setJobType(JobGraph.JobType.STREAMING);
-		jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
-		jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
+		jobGraph.setMonitoringEnabled(streamGraph.isCheckpointingEnabled());
+		jobGraph.setMonitorInterval(streamGraph.getCheckpointingInterval());
 
 		if(jobGraph.isMonitoringEnabled()) {
 			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
@@ -257,7 +257,7 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
-		config.setStateMonitoring(streamGraph.isMonitoringEnabled());
+		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
 
 		Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2096745..994ff15 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,9 +163,9 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @param interval Time interval between state checkpoints in millis
 	 */
-	public StreamExecutionEnvironment enableMonitoring(long interval) {
-		streamGraph.setMonitoringEnabled(true);
-		streamGraph.setMonitoringInterval(interval);
+	public StreamExecutionEnvironment enableCheckpointing(long interval) {
+		streamGraph.setCheckpointingEnabled(true);
+		streamGraph.setCheckpointingInterval(interval);
 		return this;
 	}
 
@@ -178,8 +178,8 @@ public abstract class StreamExecutionEnvironment {
 	 * otherwise with calling with the {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
 	 * method in case of failure the job will be resubmitted to the cluster indefinitely.
 	 */
-	public StreamExecutionEnvironment enableMonitoring() {
-		streamGraph.setMonitoringEnabled(true);
+	public StreamExecutionEnvironment enableCheckpointing() {
+		streamGraph.setCheckpointingEnabled(true);
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2842e2fd/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 4672fca..2208388 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -84,8 +84,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
-  def enableMonitoring(interval : Long) : StreamExecutionEnvironment = {
-    javaEnv.enableMonitoring(interval)
+  def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing(interval)
     this
   }
 
@@ -98,8 +98,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
-  def enableMonitoring() : StreamExecutionEnvironment = {
-    javaEnv.enableMonitoring()
+  def enableCheckpointing() : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing()
     this
   }