You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/16 09:56:40 UTC

flink git commit: [FLINK-1753] Extend KafkaITCase with large messages test + rework PersistentKafkaSource.

Repository: flink
Updated Branches:
  refs/heads/master 4754a97b1 -> 354922bed


[FLINK-1753] Extend KafkaITCase with large messages test + rework PersistentKafkaSource.

This closes #603


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354922be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354922be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354922be

Branch: refs/heads/master
Commit: 354922bed7ffe5482513ac53b5ef56bfbca8bd9a
Parents: 4754a97
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Apr 13 16:23:58 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 16 09:55:51 2015 +0200

----------------------------------------------------------------------
 .../common/functions/util/FunctionUtils.java    |  12 +-
 .../examples/java/wordcount/WordCount.java      |   2 +-
 .../typeutils/runtime/ByteArrayInputView.java   |  40 ++++++
 .../java/typeutils/runtime/TestInputView.java   |  40 ------
 .../streaming/connectors/ConnectorSource.java   |   4 +
 .../flink/streaming/connectors/kafka/Utils.java |  71 ++++++++++
 .../kafka/api/simple/PersistentKafkaSource.java |  97 ++++++++-----
 .../KafkaMultiplePartitionsIterator.java        |  14 +-
 .../iterator/KafkaSinglePartitionIterator.java  |  31 ++---
 .../streaming/connectors/kafka/KafkaITCase.java | 135 ++++++++++++++++++-
 .../functions/source/GenericSourceFunction.java |   2 +-
 .../streaming/api/operators/StreamOperator.java |   1 -
 .../serialization/DeserializationSchema.java    |   4 +-
 13 files changed, 334 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index 2486074..62e07ac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -27,28 +27,28 @@ import org.apache.flink.configuration.Configuration;
 
 public class FunctionUtils {
 
-	public static void openFunction (Function function, Configuration parameters) throws Exception{
+	public static void openFunction(Function function, Configuration parameters) throws Exception{
 		if (function instanceof RichFunction) {
 			RichFunction richFunction = (RichFunction) function;
-			richFunction.open (parameters);
+			richFunction.open(parameters);
 		}
 	}
 
-	public static void closeFunction (Function function) throws Exception{
+	public static void closeFunction(Function function) throws Exception{
 		if (function instanceof RichFunction) {
 			RichFunction richFunction = (RichFunction) function;
-			richFunction.close ();
+			richFunction.close();
 		}
 	}
 
-	public static void setFunctionRuntimeContext (Function function, RuntimeContext context){
+	public static void setFunctionRuntimeContext(Function function, RuntimeContext context){
 		if (function instanceof RichFunction) {
 			RichFunction richFunction = (RichFunction) function;
 			richFunction.setRuntimeContext(context);
 		}
 	}
 
-	public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
+	public static RuntimeContext getFunctionRuntimeContext(Function function, RuntimeContext defaultContext){
 		if (function instanceof RichFunction) {
 			RichFunction richFunction = (RichFunction) function;
 			return richFunction.getRuntimeContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index bfd5e85..7db7946 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -63,7 +63,7 @@ public class WordCount {
 		
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
-		
+
 		DataSet<Tuple2<String, Integer>> counts = 
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
new file mode 100644
index 0000000..48d6a3d
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public class ByteArrayInputView extends DataInputStream implements DataInputView {
+
+	public ByteArrayInputView(byte[] data) {
+		super(new ByteArrayInputStream(data));
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		while (numBytes > 0) {
+			int skipped = skipBytes(numBytes);
+			numBytes -= skipped;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
deleted file mode 100644
index e5ef665..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-class TestInputView extends DataInputStream implements DataInputView {
-
-	public TestInputView(byte[] data) {
-		super(new ByteArrayInputStream(data));
-	}
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		while (numBytes > 0) {
-			int skipped = skipBytes(numBytes);
-			numBytes -= skipped;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1e645cd..caabb21 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -35,6 +36,9 @@ public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OU
 
 	@Override
 	public TypeInformation<OUT> getType() {
+		if(schema instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) schema).getProducedType();
+		}
 		return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
 				null, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
new file mode 100644
index 0000000..a29ba4d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.io.IOException;
+
+public class Utils {
+	public static class TypeInformationSerializationSchema<T>
+			implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
+		private final TypeSerializer<T> serializer;
+		private final TypeInformation<T> ti;
+
+		public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
+			this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
+			this.serializer = ti.createSerializer(ec);
+		}
+		@Override
+		public T deserialize(byte[] message) {
+			try {
+				return serializer.deserialize(new ByteArrayInputView(message));
+			} catch (IOException e) {
+				throw new RuntimeException("Unable to deserialize message", e);
+			}
+		}
+
+		@Override
+		public boolean isEndOfStream(T nextElement) {
+			return false;
+		}
+
+		@Override
+		public byte[] serialize(T element) {
+			DataOutputSerializer dos = new DataOutputSerializer(16);
+			try {
+				serializer.serialize(element, dos);
+			} catch (IOException e) {
+				throw new RuntimeException("Unable to serialize record", e);
+			}
+			return dos.getByteArray();
+		}
+
+		@Override
+		public TypeInformation<T> getProducedType() {
+			return ti;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index cb89248..f9d9508 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -17,13 +17,18 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import com.google.common.base.Preconditions;
-
+import kafka.consumer.ConsumerConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
@@ -33,7 +38,6 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -50,25 +54,19 @@ import org.slf4j.LoggerFactory;
 public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
-
 	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
 
+	public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
+
 	private final String topicId;
-	private final String zookeeperServerAddress;
-	private final int zookeeperSyncTimeMillis;
-	private final int waitOnEmptyFetchMillis;
 	private final KafkaOffset startingOffset;
-
-	private int connectTimeoutMs = 100000;
-	private int bufferSize = 64 * 1024;
+	private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
 
 	private transient KafkaConsumerIterator iterator;
 	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
 
 	private transient Map<Integer, KafkaOffset> partitions;
 
-	private volatile boolean isRunning = false;
-
 	/**
 	 * Creates a persistent Kafka source that consumes a topic.
 	 * If there is are no new messages on the topic, this consumer will wait
@@ -109,11 +107,14 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
 	}
 
+
 	/**
 	 * Creates a persistent Kafka source that consumes a topic.
 	 * If there is are no new messages on the topic, this consumer will wait
 	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
 	 *
+	 * THIS CONSTRUCTOR IS DEPRECATED: USE the constructor with the ConsumerConfig.
+	 *
 	 * @param zookeeperAddress
 	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
@@ -127,18 +128,44 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 	 * @param startOffsetType
 	 * 		The offset to start from (beginning or current).
 	 */
-	public PersistentKafkaSource(String zookeeperAddress, String topicId,
-			DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
-			int waitOnEmptyFetchMillis, Offset startOffsetType) {
+	@Deprecated
+	public PersistentKafkaSource(String zookeeperAddress, String topicId,DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis, Offset startOffsetType) {
+		this(topicId, deserializationSchema, startOffsetType, legacyParametersToConsumerConfig(zookeeperAddress, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis));
+	}
+
+	private static ConsumerConfig legacyParametersToConsumerConfig(String zookeeperAddress, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
+		Properties props = new Properties();
+		props.setProperty("zookeeper.sync.time.ms", Integer.toString(zookeeperSyncTimeMillis));
+		props.setProperty(WAIT_ON_EMPTY_FETCH_KEY, Integer.toString(waitOnEmptyFetchMillis));
+		props.setProperty("zookeeper.connect", zookeeperAddress);
+		props.setProperty("group.id", "flink-persistent-kafka-source");
+		return new ConsumerConfig(props);
+	}
+
+	/**
+	 * Creates a persistent Kafka source that consumes a topic.
+	 * If there is are no new messages on the topic, this consumer will wait
+	 * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
+	 *
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param deserializationSchema
+	 * 		User defined deserialization schema.
+	 * @param startOffsetType
+	 * 		The offset to start from (beginning or current).
+	 * @param consumerConfig
+	 * 		Additional configuration for the PersistentKafkaSource.
+	 * 		NOTE: This source will only respect certain configuration values from the config!
+	 */
+	public PersistentKafkaSource(String topicId, DeserializationSchema<OUT> deserializationSchema, Offset startOffsetType, ConsumerConfig consumerConfig) {
 		super(deserializationSchema);
-		Preconditions.checkNotNull(zookeeperAddress, "The Zookeeper address can not be null");
 		Preconditions.checkNotNull(topicId, "The topic id can not be null");
 		Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
-		Preconditions.checkArgument(zookeeperSyncTimeMillis > 0, "The sync time must be positive");
-		Preconditions.checkArgument(waitOnEmptyFetchMillis > 0, "The wait time must be positive");
+		Preconditions.checkNotNull(consumerConfig, "ConsumerConfig can not be null");
+
+		this.consumerConfig = consumerConfig;
 
 		this.topicId = topicId;
-		this.zookeeperServerAddress = zookeeperAddress;
 
 		switch (startOffsetType) {
 			case FROM_BEGINNING:
@@ -151,20 +178,17 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 				this.startingOffset = new CurrentOffset();
 				break;
 		}
-
-		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
-		this.waitOnEmptyFetchMillis = waitOnEmptyFetchMillis;
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public void open(Configuration parameters) throws InterruptedException {
+		LOG.info("Starting PersistentKafkaSource");
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
 		int indexOfSubtask = context.getIndexOfThisSubtask();
 		int numberOfSubtasks = context.getNumberOfParallelSubtasks();
 
-		KafkaTopicUtils kafkaTopicUtils =
-				new KafkaTopicUtils(zookeeperServerAddress, zookeeperSyncTimeMillis, zookeeperSyncTimeMillis);
+		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(consumerConfig.zkConnect(), consumerConfig.zkSyncTimeMs(), consumerConfig.zkConnectionTimeoutMs());
 
 		int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
 
@@ -187,10 +211,10 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 				context.registerState("kafka", kafkaOffSet);
 			}
 
-			iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize);
+			iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);
 
 			if (LOG.isInfoEnabled()) {
-				LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.",
+				LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
 						indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
 			}
 		}
@@ -200,9 +224,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	@Override
 	public void run(Collector<OUT> collector) throws Exception {
-		isRunning = true;
 		MessageWithMetadata msg;
-		while (isRunning && iterator.hasNext()) {
+		while (iterator.hasNext()) {
 			msg = iterator.nextWithOffset();
 			OUT out = schema.deserialize(msg.getMessage());
 
@@ -218,17 +241,21 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		}
 	}
 
-	public void setConnectTimeoutMs(int connectTimeoutMs) {
-		Preconditions.checkArgument(connectTimeoutMs > 0, "The timeout must be positive");
-		this.connectTimeoutMs = connectTimeoutMs;
+	@Override
+	public void cancel() {
+		LOG.info("PersistentKafkaSource has been cancelled");
 	}
 
-	public void setBufferSize(int bufferSize) {
-		Preconditions.checkArgument(connectTimeoutMs > 0, "The buffer size must be positive");
-		this.bufferSize = bufferSize;
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeObject(consumerConfig.props().props());
 	}
 
-	@Override
-	public void cancel() {
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		Properties props = (Properties) in.readObject();
+		consumerConfig = new ConsumerConfig(props);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index b76421e..9da1bea 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import kafka.consumer.ConsumerConfig;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
+import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,15 +34,14 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
 
 	protected List<KafkaSinglePartitionIterator> partitions;
-	protected final int waitOnEmptyFetch;
+	protected final ConsumerConfig consumerConfig;
 
 	public KafkaMultiplePartitionsIterator(String topic,
 										Map<Integer, KafkaOffset> partitionsWithOffset,
-										KafkaTopicUtils kafkaTopicUtils,
-										int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
+										KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
 		partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
 
-		this.waitOnEmptyFetch = waitOnEmptyFetch;
+		this.consumerConfig = consumerConfig;
 
 		for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
 			partitions.add(new KafkaSinglePartitionIterator(
@@ -48,8 +49,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 					partitionWithOffset.getKey(),
 					partitionWithOffset.getValue(),
 					kafkaTopicUtils,
-					connectTimeoutMs,
-					bufferSize));
+					this.consumerConfig));
 		}
 	}
 
@@ -91,7 +91,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 			// do not wait if a new message has been fetched
 			if (!gotNewMessage) {
 				try {
-					Thread.sleep(waitOnEmptyFetch);
+					Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
 				} catch (InterruptedException e) {
 					LOG.warn("Interrupted while waiting for new messages", e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 6e326e5..0aaa771 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import kafka.consumer.ConsumerConfig;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
@@ -64,13 +65,11 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	private List<String> replicaBrokers;
 	private String clientName;
 	private Broker leadBroker;
-	private final int connectTimeoutMs;
-	private final int bufferSize;
+	private final ConsumerConfig consumerConfig;
 
 	private KafkaOffset initialOffset;
 	private transient Iterator<MessageAndOffset> iter;
 	private transient FetchResponse fetchResponse;
-	private volatile boolean isRunning;
 
 	/**
 	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
@@ -84,19 +83,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 * 		Offset to start consuming at
 	 * @param kafkaTopicUtils
 	 * 		Util for receiving topic metadata
-	 * @param connectTimeoutMs
-	 * 		Connection timeout in milliseconds
-	 * @param bufferSize
-	 * 		Size of buffer
 	 */
 	public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
-			KafkaTopicUtils kafkaTopicUtils, int connectTimeoutMs, int bufferSize) {
+			KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
 
 		Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
 		this.hosts = new ArrayList<String>(brokerAddresses);
 
-		this.connectTimeoutMs = connectTimeoutMs;
-		this.bufferSize = bufferSize;
+		this.consumerConfig = consumerConfig;
 		this.topic = topic;
 		this.partition = partition;
 
@@ -119,7 +113,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		}
 
 		PartitionMetadata metadata;
-		isRunning = true;
 		do {
 			metadata = findLeader(hosts, topic, partition);
 			try {
@@ -127,8 +120,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 			} catch (InterruptedException e) {
 				throw new InterruptedException("Establishing connection to Kafka failed");
 			}
-		} while (isRunning && metadata == null);
-		isRunning = false;
+		} while (metadata == null);
 
 		if (metadata.leader() == null) {
 			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
@@ -137,13 +129,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		leadBroker = metadata.leader();
 		clientName = "Client_" + topic + "_" + partition;
 
-		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName);
+		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
 
 		try {
 			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
 		} catch (NotLeaderForPartitionException e) {
 			do {
-
 				metadata = findLeader(hosts, topic, partition);
 
 				try {
@@ -254,7 +245,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 	private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
 		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
-				.addFetch(topic, partition, offset, 100000).build();
+				.addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
 
 		fetchResponse = consumer.fetch(req);
 
@@ -283,10 +274,9 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		consumer.close();
 		consumer = null;
 		leadBroker = findNewLeader(leadBroker, topic, partition);
-		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), 100000, 64 * 1024, clientName);
+		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
 	}
 
-	@SuppressWarnings("ConstantConditions")
 	private PartitionMetadata findLeader(List<String> addresses, String a_topic,
 			int a_partition) {
 
@@ -304,7 +294,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 			SimpleConsumer consumer = null;
 			try {
-				consumer = new SimpleConsumer(host, port, connectTimeoutMs, bufferSize, "leaderLookup");
+				consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
 				List<String> topics = Collections.singletonList(a_topic);
 
 				TopicMetadataRequest req = new TopicMetadataRequest(topics);
@@ -343,7 +333,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		return returnMetaData;
 	}
 
-	@SuppressWarnings({"ConstantConditions", "UnusedAssignment"})
 	private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
 		for (int i = 0; i < 3; i++) {
 			if (LOG.isInfoEnabled()) {
@@ -370,7 +359,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 				}
 			}
 		}
-		throw new InterruptedException("Unable to find new leader after Broker failure.");
+		throw new RuntimeException("Unable to find new leader after Broker failure.");
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 8205799..9ecd0a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -28,16 +28,21 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 
+import kafka.consumer.ConsumerConfig;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
@@ -158,7 +163,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.debug("Got " + value);
+				LOG.debug("Got value = " + value);
 				String[] sp = value.f1.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -237,14 +242,14 @@ public class KafkaITCase {
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
 				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
-		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
 			int elCnt = 0;
 			int start = -1;
 			BitSet validator = new BitSet(101);
 
 			@Override
 			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.debug("Got " + value);
+				LOG.info("Got value " + value);
 				String[] sp = value.f1.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -265,6 +270,12 @@ public class KafkaITCase {
 					throw new SuccessException();
 				}
 			}
+
+			@Override
+			public void close() throws Exception {
+				super.close();
+				Assert.assertTrue("No element received", elCnt > 0);
+			}
 		});
 
 		// add producing topology
@@ -278,6 +289,8 @@ public class KafkaITCase {
 				int cnt = 0;
 				while (running) {
 					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					LOG.info("Produced " + cnt);
+
 					try {
 						Thread.sleep(100);
 					} catch (InterruptedException ignored) {
@@ -311,6 +324,117 @@ public class KafkaITCase {
 		LOG.info("Finished KafkaITCase.tupleTestTopology()");
 	}
 
+	/**
+	 * Test Flink's Kafka integration also with very big records (30MB)
+	 *
+	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void bigRecordTestTopology() throws Exception {
+
+		LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
+
+		String topic = "bigRecordTestTopic";
+		createTestTopic(topic, 1, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+		Properties consumerProps = new Properties();
+		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
+		consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		consumerProps.setProperty("group.id", "test");
+
+		ConsumerConfig cc = new ConsumerConfig(consumerProps);
+		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc));
+
+		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+			int elCnt = 0;
+
+			@Override
+			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+				elCnt++;
+				if(value.f0 == -1) {
+					// we should have seen 11 elements now.
+					if(elCnt == 11) {
+						throw new SuccessException();
+					} else {
+						throw new RuntimeException("There have been "+elCnt+" elements");
+					}
+				}
+				if(elCnt > 10) {
+					throw new RuntimeException("More than 10 elements seen: "+elCnt);
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+			}
+
+			@Override
+			public void run(Collector<Tuple2<Long, byte[]>> collector) throws Exception {
+				LOG.info("Starting source.");
+				long cnt = 0;
+				Random rnd = new Random(1337);
+				while (running) {
+					//
+					byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
+					collector.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+					LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
+
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+					if(cnt == 10) {
+						// signal end
+						collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+
+		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(zookeeperConnectionString, topic,
+				new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
+		);
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+			}
+		}
+
+		LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
+	}
+
+
 	private static boolean partitionerHasBeenCalled = false;
 
 	@Test
@@ -616,8 +740,7 @@ public class KafkaITCase {
 					// check if everything in the bitset is set to true
 					int nc;
 					if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
-//						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
-						System.out.println("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
 					}
 					throw new SuccessException();
 				} else if (elCnt == numOfMessagesToReceive) {
@@ -698,6 +821,8 @@ public class KafkaITCase {
 		kafkaProperties.put("broker.id", Integer.toString(brokerId));
 		kafkaProperties.put("log.dir", tmpFolder.toString());
 		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
 		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
 		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
index 72a18d0..0113cfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
@@ -21,5 +21,5 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 public interface GenericSourceFunction<T> {
 
-	public TypeInformation<T> getType();
+	TypeInformation<T> getType();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 7ec0b0b..6d6c793 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -56,7 +56,6 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
 	protected TypeSerializer<IN> objectSerializer;
 	protected StreamRecord<IN> nextRecord;
 	protected IN nextObject;
-	protected boolean isMutable;
 
 	public Collector<OUT> collector;
 	protected Function userFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 08ef461..faaa79b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -28,7 +28,7 @@ public interface DeserializationSchema<T> extends Serializable {
 	 *            The incoming message in a byte array
 	 * @return The deserialized message in the required format.
 	 */
-	public T deserialize(byte[] message);
+	T deserialize(byte[] message);
 
 	/**
 	 * Method to decide whether the element signals the end of the stream. If
@@ -38,5 +38,5 @@ public interface DeserializationSchema<T> extends Serializable {
 	 *            The element to test for end signal
 	 * @return The end signal, if true the stream shuts down
 	 */
-	public boolean isEndOfStream(T nextElement);
+	boolean isEndOfStream(T nextElement);
 }