You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 10:13:53 UTC

[1/2] flink git commit: [FLINK-1589] Add option to pass configuration to LocalExecutor

Repository: flink
Updated Branches:
  refs/heads/master 79a92a647 -> 359b39c38


[FLINK-1589] Add option to pass configuration to LocalExecutor

This closes #427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf95862
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf95862
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf95862

Branch: refs/heads/master
Commit: 7cf9586223e3344c574ef58b99fdace8a34de6c4
Parents: 79a92a6
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Feb 20 12:40:41 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 2 09:00:28 2015 +0200

----------------------------------------------------------------------
 docs/local_execution.md                         |   8 ++
 .../org/apache/flink/client/LocalExecutor.java  |  18 ++-
 .../apache/flink/api/common/PlanExecutor.java   |   6 +-
 .../flink/api/java/ExecutionEnvironment.java    |  15 +++
 .../apache/flink/api/java/LocalEnvironment.java |  10 +-
 .../clients/examples/LocalExecutorITCase.java   |   1 -
 .../ExecutionEnvironmentITCase.java             | 110 +++++++++++++++++++
 7 files changed, 160 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/docs/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/local_execution.md b/docs/local_execution.md
index bac1745..8e7ecc4 100644
--- a/docs/local_execution.md
+++ b/docs/local_execution.md
@@ -77,6 +77,14 @@ public static void main(String[] args) throws Exception {
 
 The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
 
+The `LocalEnvironment` allows also to pass custom configuration values to Flink.
+
+~~~java
+Configuration conf = new Configuration();
+conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
+final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
+~~~
+
 *Note:* The local execution environments do not start any web frontend to monitor the execution.
 
 ## Collection Environment

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 5ee4e5d..b1705df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -53,6 +53,8 @@ public class LocalExecutor extends PlanExecutor {
 	
 	private LocalFlinkMiniCluster flink;
 
+	private Configuration configuration;
+
 	// ---------------------------------- config options ------------------------------------------
 	
 
@@ -70,6 +72,11 @@ public class LocalExecutor extends PlanExecutor {
 		}
 	}
 
+	public LocalExecutor(Configuration conf) {
+		this();
+		this.configuration = conf;
+	}
+
 
 	
 	public boolean isDefaultOverwriteFiles() {
@@ -90,7 +97,7 @@ public class LocalExecutor extends PlanExecutor {
 	
 	// --------------------------------------------------------------------------------------------
 
-	public static Configuration getConfiguration(LocalExecutor le) {
+	public static Configuration createConfiguration(LocalExecutor le) {
 		Configuration configuration = new Configuration();
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
 		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
@@ -102,7 +109,10 @@ public class LocalExecutor extends PlanExecutor {
 			if (this.flink == null) {
 				
 				// create the embedded runtime
-				Configuration configuration = getConfiguration(this);
+				Configuration configuration = createConfiguration(this);
+				if(this.configuration != null) {
+					configuration.addAll(this.configuration);
+				}
 				// start it up
 				this.flink = new LocalFlinkMiniCluster(configuration, true);
 			} else {
@@ -135,6 +145,7 @@ public class LocalExecutor extends PlanExecutor {
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
+	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
 		if (plan == null) {
 			throw new IllegalArgumentException("The plan may not be null.");
@@ -190,8 +201,9 @@ public class LocalExecutor extends PlanExecutor {
 	 * @return JSON dump of the optimized plan.
 	 * @throws Exception
 	 */
+	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration(this));
+		Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
 		OptimizedPlan op = pc.compile(plan);
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 0d48add..9eaddd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -60,11 +62,11 @@ public abstract class PlanExecutor {
 	 * 
 	 * @return A local executor.
 	 */
-	public static PlanExecutor createLocalExecutor() {
+	public static PlanExecutor createLocalExecutor(Configuration configuration) {
 		Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
 		
 		try {
-			return leClass.newInstance();
+			return leClass.getConstructor(Configuration.class).newInstance(configuration);
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("An error occurred while loading the local executor (" + LOCAL_EXECUTOR_CLASS + ").", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9d1fc36..5e71f44 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -63,6 +63,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -1058,6 +1059,20 @@ public abstract class ExecutionEnvironment {
 		lee.setParallelism(parallelism);
 		return lee;
 	}
+
+	/**
+	 * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
+	 * multi-threaded fashion in the same JVM as the environment was created in. It will use the
+	 * parallelism specified in the parameter.
+	 *
+	 * @param customConfiguration Pass a custom configuration to the LocalEnvironment.
+	 * @return A local execution environment with the specified parallelism.
+	 */
+	public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
+		LocalEnvironment lee = new LocalEnvironment();
+		lee.setConfiguration(customConfiguration);
+		return lee;
+	}
 	
 	/**
 	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index beecaf6..33bebf6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
 
 /**
  * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.PlanExecutor;
  * machine.
  */
 public class LocalEnvironment extends ExecutionEnvironment {
+	private Configuration configuration;
 	/**
 	 * Creates a new local environment.
 	 */
@@ -47,7 +49,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 		
-		PlanExecutor executor = PlanExecutor.createLocalExecutor();
+		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
 		return executor.executePlan(p);
 	}
 	
@@ -55,7 +57,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	public String getExecutionPlan() throws Exception {
 		Plan p = createProgramPlan(null, false);
 		
-		PlanExecutor executor = PlanExecutor.createLocalExecutor();
+		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
 		return executor.getOptimizerPlanAsJSON(p);
 	}
 	// --------------------------------------------------------------------------------------------
@@ -65,4 +67,8 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		return "Local Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
 				+ ") : " + getIdString();
 	}
+
+	public void setConfiguration(Configuration customConfiguration) {
+		this.configuration = customConfiguration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 62e2893..4f74740 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -61,6 +61,5 @@ public class LocalExecutorITCase {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
 		}
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf95862/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
new file mode 100644
index 0000000..4ebc381
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test ExecutionEnvironment from user perspective
+ */
+@RunWith(Parameterized.class)
+public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
+	private static final int PARALLELISM = 5;
+
+	public ExecutionEnvironmentITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Parameterized.Parameters(name = "Execution mode = {0}")
+	public static Collection<TestExecutionMode[]> executionModes(){
+		Collection<TestExecutionMode[]> c = new ArrayList<TestExecutionMode[]>(1);
+		c.add(new TestExecutionMode[] {TestExecutionMode.CLUSTER});
+		return c;
+	}
+
+
+	/**
+	 * Ensure that the user can pass a custom configuration object to the LocalEnvironment
+	 */
+	@Test
+	public void testLocalEnvironmentWithConfig() throws Exception {
+		Configuration conf = new Configuration();
+		conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
+		env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+		DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
+				.rebalance()
+				.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
+					@Override
+					public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+						out.collect(getRuntimeContext().getIndexOfThisSubtask());
+					}
+				});
+		List<Integer> resultCollection = new ArrayList<Integer>();
+		result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+		env.execute();
+		assertEquals(PARALLELISM, resultCollection.size());
+	}
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(PARALLELISM, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+}


[2/2] flink git commit: [FLINK-1753] [streaming] Added test for Kafka connector with tuple type

Posted by rm...@apache.org.
[FLINK-1753] [streaming] Added test for Kafka connector with tuple type

This closes #557


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/359b39c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/359b39c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/359b39c3

Branch: refs/heads/master
Commit: 359b39c383347b14c1a3e382d59717ac1be1b222
Parents: 7cf9586
Author: Gábor Hermann <re...@gmail.com>
Authored: Wed Apr 1 14:51:04 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 2 09:21:23 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/api/KafkaSink.java         |   9 +-
 .../connectors/kafka/api/KafkaSource.java       |  20 +
 .../kafka/api/config/PartitionerWrapper.java    |   2 +-
 .../streaming/connectors/kafka/KafkaITCase.java | 428 ++++++++++++++++---
 4 files changed, 406 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f1dbc8c..0bbf9a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -34,7 +34,6 @@ import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.serializer.DefaultEncoder;
-import kafka.serializer.StringEncoder;
 
 /**
  * Sink that emits its inputs to a Kafka topic.
@@ -123,7 +122,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		props.put("request.required.acks", "1");
 
 		props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-		props.put("key.serializer.class", StringEncoder.class.getCanonicalName());
+
+		// this will not be used as the key will not be serialized
+		props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
 
 		if (partitioner != null) {
 			props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
@@ -152,7 +153,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN next) {
 		byte[] serialized = schema.serialize(next);
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
+
+		// Sending message without serializable key.
+		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4a6da3b..a0805c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -66,6 +66,26 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	/**
 	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param groupId
+	 * 			   ID of the consumer group.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 */
+	public KafkaSource(String zookeeperAddress,
+					String topicId, String groupId,
+					DeserializationSchema<OUT> deserializationSchema,
+					long zookeeperSyncTimeMillis) {
+		this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
+	}
+	/**
+	 * Creates a KafkaSource that consumes a topic.
 	 * 
 	 * @param zookeeperAddress
 	 *            Address of the Zookeeper host (with port number).

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
index f9dd21f..7ae17df 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -32,7 +32,7 @@ import kafka.utils.VerifiableProperties;
  *
  * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
  *
- * The serialziable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
  */
 public class PartitionerWrapper implements Partitioner {
 	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";

http://git-wip-us.apache.org/repos/asf/flink/blob/359b39c3/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 95609f9..9344722 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -24,7 +27,9 @@ import java.net.UnknownHostException;
 import java.util.BitSet;
 import java.util.Properties;
 
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -33,14 +38,19 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -61,74 +71,390 @@ public class KafkaITCase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
 
-	private final String TOPIC = "myTopic";
+	private static int zkPort;
+	private static int kafkaPort;
+	private static String kafkaHost;
+
+	private static String zookeeperConnectionString;
 
-	private int zkPort;
-	private int kafkaPort;
-	private String kafkaHost;
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	public static File tmpZkDir;
+	public static File tmpKafkaDir;
 
-	private String zookeeperConnectionString;
+	private static TestingServer zookeeper;
+	private static KafkaServer broker1;
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-	public File tmpZkDir;
-	public File tmpKafkaDir;
 
-	@Before
-	public void prepare() throws IOException {
+	@BeforeClass
+	public static void prepare() throws IOException {
+		LOG.info("Starting KafkaITCase.prepare()");
 		tmpZkDir = tempFolder.newFolder();
 		tmpKafkaDir = tempFolder.newFolder();
 		kafkaHost = InetAddress.getLocalHost().getHostName();
 		zkPort = NetUtils.getAvailablePort();
 		kafkaPort = NetUtils.getAvailablePort();
 		zookeeperConnectionString = "localhost:" + zkPort;
-	}
 
-	@Test
-	public void test() {
-		LOG.info("Starting KafkaITCase.test()");
-		TestingServer zookeeper = null;
-		KafkaServer broker1 = null;
+		zookeeper = null;
+		broker1 = null;
+
 		try {
+			LOG.info("Starting Zookeeper");
 			zookeeper = getZookeeper();
+			LOG.info("Starting KafkaServer");
 			broker1 = getKafkaServer(0);
-			LOG.info("ZK and KafkaServer started. Creating test topic:");
-			createTestTopic();
-
-			LOG.info("Starting Kafka Topology in Flink:");
-			startKafkaTopology();
-
-			LOG.info("Test succeeded.");
+			LOG.info("ZK and KafkaServer started.");
 		} catch (Throwable t) {
 			LOG.warn("Test failed with exception", t);
 			Assert.fail("Test failed with: " + t.getMessage());
-		} finally {
-			LOG.info("Shutting down all services");
-			if (broker1 != null) {
-				broker1.shutdown();
+		}
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		LOG.info("Shutting down all services");
+		if (broker1 != null) {
+			broker1.shutdown();
+		}
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			} catch (IOException e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+		}
+	}
+
+	@Test
+	public void regularKafkaSourceTest() throws Exception {
+		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+		String topic = "regularKafkaSourceTestTopic";
+		createTestTopic(topic, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new TupleSerializationSchema(), 5000));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
 			}
-			if (zookeeper != null) {
-				try {
-					zookeeper.stop();
-				} catch (IOException e) {
-					LOG.warn("ZK.stop() failed", e);
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
 				}
 			}
 		}
 
+		LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
 	}
 
-	private void createTestTopic() {
-		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		kafkaTopicUtils.createTopic(TOPIC, 1, 1);
+	@Test
+	public void tupleTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+		String topic = "tupleTestTopic";
+		createTestTopic(topic, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+			}
+		}
+
+		LOG.info("Finished KafkaITCase.tupleTestTopology()");
 	}
 
-	private void startKafkaTopology() throws Exception {
+	private static boolean partitionerHasBeenCalled = false;
+
+	@Test
+	public void customPartitioningTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+		String topic = "customPartitioningTestTopic";
+		
+		createTestTopic(topic, 3);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			boolean gotPartition1 = false;
+			boolean gotPartition2 = false;
+			boolean gotPartition3 = false;
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				switch (v) {
+					case 9:
+						gotPartition1 = true;
+						break;
+					case 19:
+						gotPartition2 = true;
+						break;
+					case 99:
+						gotPartition3 = true;
+						break;
+				}
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+
+				if (gotPartition1 && gotPartition2 && gotPartition3) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got cancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), new CustomPartitioner()));
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					throw good;
+				}
+			}
+
+			assertTrue(partitionerHasBeenCalled);
+		}
+
+		LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+	}
+
+	/**
+	 * This is for a topic with 3 partitions and Tuple2<Long, String>
+	 */
+	private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+		@Override
+		public int partition(Object key, int numPartitions) {
+			partitionerHasBeenCalled = true;
+
+			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+			if (tuple.f0 < 10) {
+				return 0;
+			} else if (tuple.f0 < 20) {
+				return 1;
+			} else {
+				return 2;
+			}
+		}
+	}
+
+	private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Tuple2<Long, String> deserialize(byte[] message) {
+			Object deserializedObject = SerializationUtils.deserialize(message);
+			return (Tuple2<Long, String>) deserializedObject;
+		}
+
+		@Override
+		public byte[] serialize(Tuple2<Long, String> element) {
+			return SerializationUtils.serialize(element);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
+			return false;
+		}
+
+	}
+
+	@Test
+	public void simpleTestTopology() throws Exception {
+		String topic = "simpleTestTopic";
+
+		createTestTopic(topic, 1);
+
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		// add consuming topology:
 		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
+				new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING));
 		consuming.addSink(new SinkFunction<String>() {
 			int elCnt = 0;
 			int start = -1;
@@ -176,11 +502,11 @@ public class KafkaITCase {
 
 			@Override
 			public void cancel() {
-				LOG.info("Source got chancel()");
+				LOG.info("Source got cancel()");
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema()));
+		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()));
 
 		try {
 			env.setParallelism(1);
@@ -191,21 +517,28 @@ public class KafkaITCase {
 			while (!(t instanceof SuccessException)) {
 				t = t.getCause();
 				if (limit++ == 20) {
-					throw good;
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
 				}
 			}
 		}
 	}
 
 
-	private TestingServer getZookeeper() throws Exception {
+	private void createTestTopic(String topic, int numberOfPartitions) {
+		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+		kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+	}
+
+
+	private static TestingServer getZookeeper() throws Exception {
 		return new TestingServer(zkPort, tmpZkDir);
 	}
 
 	/**
 	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
 	 */
-	private KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
+	private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
 		Properties kafkaProperties = new Properties();
 		// properties have to be Strings
 		kafkaProperties.put("advertised.host.name", kafkaHost);
@@ -220,13 +553,12 @@ public class KafkaITCase {
 		return server;
 	}
 
-	public class LocalSystemTime implements Time {
+	public static class LocalSystemTime implements Time {
 
 		@Override
 		public long milliseconds() {
 			return System.currentTimeMillis();
 		}
-
 		public long nanoseconds() {
 			return System.nanoTime();
 		}
@@ -243,9 +575,7 @@ public class KafkaITCase {
 	}
 
 	public static class SuccessException extends Exception {
-
 		private static final long serialVersionUID = 1L;
-
 	}
 
 }