You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 18:09:51 UTC

[1/5] flink git commit: Revert "[FLINK-4177] Harden CassandraConnectorITCase"

Repository: flink
Updated Branches:
  refs/heads/master 611412c6b -> 02c10d312


Revert "[FLINK-4177] Harden CassandraConnectorITCase"

This reverts commit 62523acbe175cf159fe1b4ab6cf5c0412fc4d232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02c10d31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02c10d31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02c10d31

Branch: refs/heads/master
Commit: 02c10d312371aaad12ed8961cdf96288fe78a983
Parents: 95d640b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 16 17:59:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-cassandra/pom.xml           |  11 +-
 .../connectors/cassandra/CassandraSinkBase.java |  39 +-
 .../cassandra/CassandraConnectorITCase.java     | 374 +++++++++++--------
 .../connectors/cassandra/CassandraService.java  | 118 ------
 .../src/test/resources/cassandra.yaml           |  41 +-
 5 files changed, 232 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
index 07cdc09..3a1731c 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -37,8 +37,8 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<cassandra.version>2.2.7</cassandra.version>
-		<driver.version>3.0.3</driver.version>
+		<cassandra.version>2.2.5</cassandra.version>
+		<driver.version>3.0.0</driver.version>
 	</properties>
 
 	<build>
@@ -159,13 +159,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
-		<!-- we need this dependency for the EmbeddedCassandraService-->
-		<dependency>
-			<groupId>org.caffinitas.ohc</groupId>
-			<artifactId>ohc-core</artifactId>
-			<version>0.4.5</version>
-			<scope>test</scope>
-		</dependency>
 		<dependency>
 			<groupId>org.apache.cassandra</groupId>
 			<artifactId>cassandra-all</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 9c4c430..49b1efa 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,8 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
@@ -42,13 +40,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient final AtomicReference<Throwable> exception = new AtomicReference<>();
+	protected transient Throwable exception = null;
 	protected transient FutureCallback<V> callback;
 
 	private final ClusterBuilder builder;
 
-	protected final AtomicInteger updatesPending = new AtomicInteger();
-
 	protected CassandraSinkBase(ClusterBuilder builder) {
 		this.builder = builder;
 		ClosureCleaner.clean(builder, true);
@@ -59,24 +55,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {
-				int pending = updatesPending.decrementAndGet();
-				if (pending == 0) {
-					synchronized (updatesPending) {
-						updatesPending.notifyAll();
-					}
-				}
 			}
 
 			@Override
 			public void onFailure(Throwable t) {
-				int pending = updatesPending.decrementAndGet();
-				if (pending == 0) {
-					synchronized (updatesPending) {
-						updatesPending.notifyAll();
-					}
-				}
-				exception.set(t);
-				
+				exception = t;
 				LOG.error("Error while sending value.", t);
 			}
 		};
@@ -86,12 +69,10 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	@Override
 	public void invoke(IN value) throws Exception {
-		Throwable e = exception.get();
-		if (e != null) {
-			throw new IOException("Error while sending value.", e);
+		if (exception != null) {
+			throw new IOException("invoke() failed", exception);
 		}
 		ListenableFuture<V> result = send(value);
-		updatesPending.incrementAndGet();
 		Futures.addCallback(result, callback);
 	}
 
@@ -99,14 +80,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	@Override
 	public void close() {
-		while (updatesPending.get() > 0) {
-			synchronized (updatesPending) {
-				try {
-					updatesPending.wait();
-				} catch (InterruptedException e) {
-				}
-			}
-		}
 		try {
 			if (session != null) {
 				session.close();
@@ -121,9 +94,5 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 		} catch (Exception e) {
 			LOG.error("Error while closing cluster.", e);
 		}
-		Throwable e = exception.get();
-		if (e != null) {
-			LOG.error("Error while sending value.", e);
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 258ef52..2bb6fd1 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -20,138 +20,192 @@ package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.PoolingOptions;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SocketOptions;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.TestEnvironment;
 
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
+import java.util.Scanner;
 import java.util.UUID;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("serial")
-public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraConnectorITCase.TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
-
-	private static final String TABLE_NAME_PREFIX = "flink_";
-	private static final String TABLE_NAME_VARIABLE = "$TABLE";
-	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-	private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, batch_id int);";
-	private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)";
-	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';';
+	private static File tmpDir;
 
 	private static final boolean EMBEDDED = true;
 
-	private static final ClusterBuilder builder = new ClusterBuilder() {
+	private static EmbeddedCassandraService cassandra;
+
+	private static ClusterBuilder builder = new ClusterBuilder() {
 		@Override
 		protected Cluster buildCluster(Cluster.Builder builder) {
 			return builder
 				.addContactPoint("127.0.0.1")
-				.withSocketOptions(new SocketOptions().setConnectTimeoutMillis(30000))
 				.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
-				.withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 32, 32).setMaxRequestsPerConnection(HostDistance.LOCAL, 2048).setPoolTimeoutMillis(15000))
 				.withoutJMXReporting()
 				.withoutMetrics().build();
 		}
 	};
 
-	private static final List<Tuple3<String, Integer, Integer>> collection = new ArrayList<>();
-
-	private static CassandraService cassandra;
 	private static Cluster cluster;
 	private static Session session;
 
-	private static final Random random = new Random();
-	private int tableID;
+	private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+	private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;";
+	private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+	private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+	private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)";
+	private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;";
 
-	@BeforeClass
-	public static void generateCollection() {
+	private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
+
+	static {
 		for (int i = 0; i < 20; i++) {
 			collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0));
 		}
 	}
 
+	private static class EmbeddedCassandraService {
+		CassandraDaemon cassandraDaemon;
+
+		public void start() throws IOException {
+			this.cassandraDaemon = new CassandraDaemon();
+			this.cassandraDaemon.init(null);
+			this.cassandraDaemon.start();
+		}
+
+		public void stop() {
+			this.cassandraDaemon.stop();
+		}
+	}
+
+	private static LocalFlinkMiniCluster flinkCluster;
+
+	// ------------------------------------------------------------------------
+	//  Cluster Setup (Cassandra & Flink)
+	// ------------------------------------------------------------------------
+
 	@BeforeClass
 	public static void startCassandra() throws IOException {
 
 		// check if we should run this test, current Cassandra version requires Java >= 1.8
-		CommonTestUtils.assumeJava8();
-
-		try {
-			cassandra = new CassandraService();
-		} catch (Exception e) {
-			LOG.error("Failed to instantiate cassandra service.", e);
-			Assert.fail("Failed to instantiate cassandra service.");
+		org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
+
+		// generate temporary files
+		tmpDir = CommonTestUtils.createTempDirectory();
+		ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
+		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
+		File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
+		
+		assertTrue(tmp.createNewFile());
+
+		try (
+			BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
+
+			//copy cassandra.yaml; inject absolute paths into cassandra.yaml
+			Scanner scanner = new Scanner(file);
+		) {
+			while (scanner.hasNextLine()) {
+				String line = scanner.nextLine();
+				line = line.replace("$PATH", "'" + tmp.getParentFile());
+				b.write(line + "\n");
+				b.flush();
+			}
 		}
 
+
+		// Tell cassandra where the configuration files are.
+		// Use the test configuration file.
+		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
+
 		if (EMBEDDED) {
-			cassandra.startProcess();
+			cassandra = new EmbeddedCassandraService();
+			cassandra.start();
 		}
 
-		long start = System.currentTimeMillis();
-		long deadline = start + 1000 * 30;
-		while (true) {
-			try {
-				cluster = builder.getCluster();
-				session = cluster.connect();
-				break;
-			} catch (Exception e) {
-				if (System.currentTimeMillis() > deadline) {
-					throw e;
-				}
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException ignored) {
-				}
-			}
+		try {
+			Thread.sleep(1000 * 10);
+		} catch (InterruptedException e) { //give cassandra a few seconds to start up
 		}
-		LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start);
+
+		cluster = builder.getCluster();
+		session = cluster.connect();
 
 		session.execute(CREATE_KEYSPACE_QUERY);
-		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + "initial"));
+		session.execute(CREATE_TABLE_QUERY);
 	}
 
-	@Before
-	public void createTable() {
-		tableID = random.nextInt(Integer.MAX_VALUE);
-		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID));
+	@BeforeClass
+	public static void startFlink() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+		flinkCluster = new LocalFlinkMiniCluster(config);
+		flinkCluster.start();
+	}
+
+	@AfterClass
+	public static void stopFlink() {
+		if (flinkCluster != null) {
+			flinkCluster.stop();
+			flinkCluster = null;
+		}
 	}
 
 	@AfterClass
 	public static void closeCassandra() {
 		if (session != null) {
+			session.executeAsync(DROP_KEYSPACE_QUERY);
 			session.close();
 		}
 
@@ -159,11 +213,29 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 			cluster.close();
 		}
 
-		if (EMBEDDED) {
-			if (cassandra != null) {
-				cassandra.destroy();
-			}
+		if (cassandra != null) {
+			cassandra.stop();
 		}
+
+		if (tmpDir != null) {
+			//noinspection ResultOfMethodCallIgnored
+			tmpDir.delete();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test preparation & cleanup
+	// ------------------------------------------------------------------------
+
+	@Before
+	public void initializeExecutionEnvironment() {
+		TestStreamEnvironment.setAsContext(flinkCluster, 4);
+		new TestEnvironment(flinkCluster, 4, false).setAsContext();
+	}
+
+	@After
+	public void deleteSchema() throws Exception {
+		session.executeAsync(CLEAR_TABLE_QUERY);
 	}
 
 	// ------------------------------------------------------------------------
@@ -171,9 +243,9 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
-		return new TestCassandraTupleWriteAheadSink<>(
-			TABLE_NAME_PREFIX + tableID,
+	protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
+		return new CassandraTupleWriteAheadSink<>(
+			INSERT_DATA_QUERY,
 			TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()),
 			builder,
 			new CassandraCommitter(builder));
@@ -192,42 +264,43 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 	@Override
 	protected void verifyResultsIdealCircumstances(
 		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
-		ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
 			list.add(x);
 		}
 
 		for (Row s : result) {
-			list.remove(Integer.valueOf(s.getInt("counter")));
+			list.remove(new Integer(s.getInt("counter")));
 		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty());
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
 	}
 
 	@Override
 	protected void verifyResultsDataPersistenceUponMissedNotify(
 		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
 
-		ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 60; x++) {
 			list.add(x);
 		}
 
 		for (Row s : result) {
-			list.remove(Integer.valueOf(s.getInt("counter")));
+			list.remove(new Integer(s.getInt("counter")));
 		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty());
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
 	}
 
 	@Override
 	protected void verifyResultsDataDiscardingUponRestore(
 		OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness,
-		TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
-		ResultSet result = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+		CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
+
+		ResultSet result = session.execute(SELECT_DATA_QUERY);
 		ArrayList<Integer> list = new ArrayList<>();
 		for (int x = 1; x <= 20; x++) {
 			list.add(x);
@@ -237,24 +310,23 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		}
 
 		for (Row s : result) {
-			list.remove(Integer.valueOf(s.getInt("counter")));
+			list.remove(new Integer(s.getInt("counter")));
 		}
-		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list, list.isEmpty());
+		Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty());
 	}
 
 	@Test
 	public void testCassandraCommitter() throws Exception {
-		String jobID = new JobID().toString();
-		CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc");
-		cc1.setJobId(jobID);
+		CassandraCommitter cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
 		cc1.setOperatorId("operator");
 
-		CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc");
-		cc2.setJobId(jobID);
+		CassandraCommitter cc2 = new CassandraCommitter(builder);
+		cc2.setJobId("job");
 		cc2.setOperatorId("operator");
 
-		CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc");
-		cc3.setJobId(jobID);
+		CassandraCommitter cc3 = new CassandraCommitter(builder);
+		cc3.setJobId("job");
 		cc3.setOperatorId("operator1");
 
 		cc1.createResource();
@@ -280,17 +352,17 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 		cc2.close();
 		cc3.close();
 
-		CassandraCommitter cc4 = new CassandraCommitter(builder, "flink_auxiliary_cc");
-		cc4.setJobId(jobID);
-		cc4.setOperatorId("operator");
+		cc1 = new CassandraCommitter(builder);
+		cc1.setJobId("job");
+		cc1.setOperatorId("operator");
 
-		cc4.open();
+		cc1.open();
 
 		//verify that checkpoint data is not destroyed within open/close and not reliant on internally cached data
-		Assert.assertTrue(cc4.isCheckpointCommitted(0, 1));
-		Assert.assertFalse(cc4.isCheckpointCommitted(0, 2));
+		Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
+		Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
 
-		cc4.close();
+		cc1.close();
 	}
 
 	// ------------------------------------------------------------------------
@@ -299,94 +371,70 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 	@Test
 	public void testCassandraTupleAtLeastOnceSink() throws Exception {
-		CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new CassandraTupleSink<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
-		sink.open(new Configuration());
+		DataStream<Tuple3<String, Integer, Integer>> source = env.fromCollection(collection);
+		source.addSink(new CassandraTupleSink<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
 
-		for (Tuple3<String, Integer, Integer> value : collection) {
-			sink.send(value);
-		}
-
-		sink.close();
-
-		synchronized (sink.updatesPending) {
-			if (sink.updatesPending.get() != 0) {
-				sink.updatesPending.wait();
-			}
-		}
+		env.execute();
 
-		ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID));
-		try {
-			Assert.assertEquals(20, rs.all().size());
-		} catch (Throwable e) {
-			LOG.error("test failed.", e);
-		}
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		Assert.assertEquals(20, rs.all().size());
 	}
 
 	@Test
 	public void testCassandraPojoAtLeastOnceSink() throws Exception {
-		session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
-
-		CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builder);
-
-		sink.open(new Configuration());
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStreamSource<Pojo> source = env
+			.addSource(new SourceFunction<Pojo>() {
+
+				private boolean running = true;
+				private volatile int cnt = 0;
+
+				@Override
+				public void run(SourceContext<Pojo> ctx) throws Exception {
+					while (running) {
+						ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0));
+						cnt++;
+						if (cnt == 20) {
+							cancel();
+						}
+					}
+				}
 
-		for (int x = 0; x < 20; x++) {
-			sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
-		}
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			});
 
-		sink.close();
+		source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
 
-		synchronized (sink.updatesPending) {
-			while (sink.updatesPending.get() != 0) {
-				sink.updatesPending.wait();
-			}
-		}
+		env.execute();
 
-		ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
-		try {
-			Assert.assertEquals(20, rs.all().size());
-		} catch (Throwable e) {
-			LOG.error("test failed.", e);
-		}
+		ResultSet rs = session.execute(SELECT_DATA_QUERY);
+		Assert.assertEquals(20, rs.all().size());
 	}
 
 	@Test
 	public void testCassandraBatchFormats() throws Exception {
-		OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder);
-		sink.configure(new Configuration());
-		sink.open(0, 1);
-
-		for (Tuple3<String, Integer, Integer> value : collection) {
-			sink.writeRecord(value);
-		}
-
-		sink.close();
-
-		InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_NAME_PREFIX + tableID), builder);
-		source.configure(new Configuration());
-		source.open(null);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
-		List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
+		DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
+		dataSet.output(new CassandraOutputFormat<Tuple3<String, Integer, Integer>>(INSERT_DATA_QUERY, builder));
 
-		while (!source.reachedEnd()) {
-			result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
-		}
+		env.execute("Write data");
 
-		source.close();
-		try {
-			Assert.assertEquals(20, result.size());
-		} catch (Throwable e) {
-			LOG.error("test failed.", e);
-		}
-	}
+		DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput(
+			new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder),
+			TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){}));
 
-	protected static class TestCassandraTupleWriteAheadSink<IN extends Tuple> extends CassandraTupleWriteAheadSink<IN> {
-		private final String tableName;
 
-		private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
-			super(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, tableName), serializer, builder, committer);
-			this.tableName = tableName;
-		}
+		long count = inputDS.count();
+		Assert.assertEquals(count, 20L);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
deleted file mode 100644
index 2e649e4..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.TestJvmProcess;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Scanner;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-
-public class CassandraService extends TestJvmProcess {
-	private File tmpDir;
-	private File tmpCassandraYaml;
-
-	public CassandraService() throws Exception {
-		createCassandraYaml();
-		setJVMMemory(512);
-	}
-
-	private void createCassandraYaml() throws IOException {
-		// generate temporary files
-		tmpDir = CommonTestUtils.createTempDirectory();
-		ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
-		File file = new File(classLoader.getResource("cassandra.yaml").getFile());
-		tmpCassandraYaml = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");
-
-		assertTrue(tmpCassandraYaml.createNewFile());
-		BufferedWriter b = new BufferedWriter(new FileWriter(tmpCassandraYaml));
-
-		//copy cassandra.yaml; inject absolute paths into cassandra.yaml
-		Scanner scanner = new Scanner(file);
-		while (scanner.hasNextLine()) {
-			String line = scanner.nextLine();
-			line = line.replace("$PATH", "'" + tmpCassandraYaml.getParentFile());
-			b.write(line + "\n");
-			b.flush();
-		}
-		scanner.close();
-	}
-
-	@Override
-	public String getName() {
-		return "CassandraService";
-	}
-
-	@Override
-	public String[] getJvmArgs() {
-		return new String[]{
-			tmpCassandraYaml.toURI().toString(),
-			// these options were taken directly from the jvm.options file in the cassandra repo
-			"-XX:+UseThreadPriorities",
-			"-Xss256k",
-			"-XX:+AlwaysPreTouch",
-			"-XX:+UseTLAB",
-			"-XX:+ResizeTLAB",
-			"-XX:+UseNUMA",
-			"-XX:+PerfDisableSharedMem",
-			"-XX:+UseParNewGC",
-			"-XX:+UseConcMarkSweepGC",
-			"-XX:+CMSParallelRemarkEnabled",
-			"-XX:SurvivorRatio=8",
-			"-XX:MaxTenuringThreshold=1",
-			"-XX:CMSInitiatingOccupancyFraction=75",
-			"-XX:+UseCMSInitiatingOccupancyOnly",
-			"-XX:CMSWaitDuration=10000",
-			"-XX:+CMSParallelInitialMarkEnabled",
-			"-XX:+CMSEdenChunksRecordAlways",
-			"-XX:+CMSClassUnloadingEnabled",};
-	}
-
-	@Override
-	public String getEntryPointClassName() {
-		return CassandraServiceEntryPoint.class.getName();
-	}
-
-	public static class CassandraServiceEntryPoint {
-		public static void main(String[] args) throws InterruptedException {
-			final CassandraDaemon cassandraDaemon = new CassandraDaemon();
-
-			System.setProperty("cassandra.config", args[0]);
-
-			cassandraDaemon.activate();
-
-			Runtime.getRuntime().addShutdownHook(new Thread() {
-				@Override
-				public void run() {
-					cassandraDaemon.deactivate();
-				}
-			});
-
-			// Run forever
-			new CountDownLatch(1).await();
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
index 77ee0ac..0594ea3 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -15,40 +15,29 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
-auto_snapshot: false
-
 cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 10000
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
 commitlog_directory: $PATH/commit'
-commitlog_sync: periodic
-commitlog_sync_period_in_ms: 5000
-
 data_file_directories:
     - $PATH/data'
-disk_access_mode: mmap
-
-endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
-
-listen_address: '127.0.0.1'
-
-memtable_allocation_type: offheap_objects
-
-native_transport_port: 9042
-
-partitioner: org.apache.cassandra.dht.Murmur3Partitioner
-
-read_request_timeout_in_ms: 15000
-request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
-request_scheduler_id: keyspace
-rpc_port: 9170
-
 saved_caches_directory: $PATH/cache'
+listen_address: '127.0.0.1'
 seed_provider:
     - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
       parameters:
           - seeds: '127.0.0.1'
+native_transport_port: 9042
+
+concurrent_reads: 8
+concurrent_writes: 8
+
+auto_bootstrap: false
+auto_snapshot: false
+
 start_rpc: false
 start_native_transport: true
-storage_port: 7010
-
-write_request_timeout_in_ms: 15000
+native_transport_max_threads: 8


[4/5] flink git commit: Revert "[hotfix] [cassandra] Fix CassandraSinkBase serialization issue"

Posted by se...@apache.org.
Revert "[hotfix] [cassandra] Fix CassandraSinkBase serialization issue"

This reverts commit 5fa389014a3ce40534703c8a5731c8a9a955058a.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d640b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d640b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d640b8

Branch: refs/heads/master
Commit: 95d640b832a91b05d6031dbb09086206369666d3
Parents: cf78542
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 16 17:59:01 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/cassandra/CassandraSinkBase.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95d640b8/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 713a286..9c4c430 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 	protected transient Cluster cluster;
 	protected transient Session session;
 
-	protected transient AtomicReference<Throwable> exception;
+	protected transient final AtomicReference<Throwable> exception = new AtomicReference<>();
 	protected transient FutureCallback<V> callback;
 
 	private final ClusterBuilder builder;
@@ -56,7 +56,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
 
 	@Override
 	public void open(Configuration configuration) {
-		this.exception = new AtomicReference<>();
 		this.callback = new FutureCallback<V>() {
 			@Override
 			public void onSuccess(V ignored) {


[2/5] flink git commit: Revert "[hotfix] [cassandra] [tests] Show error log messages"

Posted by se...@apache.org.
Revert "[hotfix] [cassandra] [tests] Show error log messages"

This reverts commit 1adefee2eff9c9b4267f5b5e10022255cfcaf47a.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf785422
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf785422
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf785422

Branch: refs/heads/master
Commit: cf78542232e9bed62fbb1a0de568bf26adbdb098
Parents: a66e7ad
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 16 17:58:24 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraConnectorITCase.java         | 6 +-----
 .../src/test/resources/log4j-test.properties                   | 2 +-
 2 files changed, 2 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf785422/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index acfe848..258ef52 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -161,11 +161,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 		if (EMBEDDED) {
 			if (cassandra != null) {
-				try {
-					cassandra.destroy();
-				} catch (IllegalStateException ignored) {
-					// the process was never started
-				}
+				cassandra.destroy();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf785422/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
index 0a630f4..a43d556 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
+++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=ERROR, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target= System.err


[5/5] flink git commit: [FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to better handel shutdown/interrupt situations

Posted by se...@apache.org.
[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to better handel shutdown/interrupt situations

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block in the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless that thread would be interrupted (which it could not be).

This commit changes the thread model:
  - A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes the
    batch of records into a blocking queue (size one)
  - The main thread of the task will pull the record batches from the blocking queue and
    emit the records.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a66e7ad1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a66e7ad1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a66e7ad1

Branch: refs/heads/master
Commit: a66e7ad14e41fa07737f447d68920ad5cc4ed6d3
Parents: fa1864c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 10 11:13:43 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  20 +
 .../kafka/internal/Kafka010Fetcher.java         |   7 +-
 .../internal/KafkaConsumerCallBridge010.java    |  40 ++
 .../connectors/kafka/Kafka010FetcherTest.java   | 172 ++++++++-
 .../kafka/KafkaShortRetention010ITCase.java     |  34 --
 .../connectors/kafka/internal/Handover.java     | 214 ++++++++++
 .../kafka/internal/Kafka09Fetcher.java          | 274 +++----------
 .../kafka/internal/KafkaConsumerCallBridge.java |  41 ++
 .../kafka/internal/KafkaConsumerThread.java     | 332 ++++++++++++++++
 .../connectors/kafka/Kafka09FetcherTest.java    | 164 +++++++-
 .../kafka/KafkaShortRetention09ITCase.java      |  34 --
 .../connectors/kafka/internal/HandoverTest.java | 387 +++++++++++++++++++
 .../kafka/KafkaShortRetentionTestBase.java      |   1 +
 13 files changed, 1422 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index cc7f56f..32bc1d2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -100,6 +100,26 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Throws the given {@code Throwable} in scenarios where the signatures do allow to
+	 * throw a Exception. Errors and Exceptions are thrown directly, other "exotic"
+	 * subclasses of Throwable are wrapped in an Exception.
+	 *
+	 * @param t The throwable to be thrown.
+	 * @param parentMessage The message for the parent Exception, if one is needed.
+	 */
+	public static void rethrowException(Throwable t, String parentMessage) throws Exception {
+		if (t instanceof Error) {
+			throw (Error) t;
+		}
+		else if (t instanceof Exception) {
+			throw (Exception) t;
+		}
+		else {
+			throw new Exception(parentMessage, t);
+		}
+	}
+
+	/**
 	 * Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions
 	 * (and RuntimeException and Error). Throws this exception directly, if it is an IOException,
 	 * a RuntimeException, or an Error. Otherwise does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 024cd38..71dd29a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -29,7 +29,6 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.List;
@@ -91,11 +90,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
 	/**
 	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
-	 * changing the List in the signature to a Collection.
+	 * changing binary signatures
 	 */
 	@Override
-	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
+	protected KafkaConsumerCallBridge010 createCallBridge() {
+		return new KafkaConsumerCallBridge010();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
new file mode 100644
index 0000000..a81b098
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need two versions whose compiled code goes against different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+	@Override
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 037d25b..6ee0429 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,16 +20,20 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +49,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +59,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,7 +75,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka010Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka010Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka010FetcherTest {
 
     @Test
@@ -125,7 +131,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -174,9 +180,13 @@ public class Kafka010FetcherTest {
         fetcherRunner.join();
 
         // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null) {
-            throw new Exception("Exception in the fetcher", caughtError);
+        final Throwable fetcherError = error.get();
+        if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+            throw new Exception("Exception in the fetcher", fetcherError);
+        }
+        final Throwable committerError = commitError.get();
+        if (committerError != null) {
+            throw new Exception("Exception in the committer", committerError);
         }
     }
 
@@ -258,7 +268,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -321,8 +331,154 @@ public class Kafka010FetcherTest {
 
         // check that there were no errors in the fetcher
         final Throwable caughtError = error.get();
-        if (caughtError != null) {
+        if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
             throw new Exception("Exception in the fetcher", caughtError);
         }
     }
+
+    @Test
+    public void testCancellationWhenEmitBlocks() throws Exception {
+
+        // ----- some test data -----
+
+        final String topic = "test-topic";
+        final int partition = 3;
+        final byte[] payload = new byte[] {1, 2, 3, 4};
+
+        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+        data.put(new TopicPartition(topic, partition), records);
+
+        final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+        // ----- the test consumer -----
+
+        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+                return consumerRecords;
+            }
+        });
+
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- build a fetcher -----
+
+        BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic watermark extractor */
+                null, /* punctuated watermark extractor */
+                new TestProcessingTimeService(),
+                10, /* watermark interval */
+                this.getClass().getClassLoader(),
+                true, /* checkpointing */
+                "task_name",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the thread started to emit records to the source context
+        sourceContext.waitTillHasBlocker();
+
+        // now we try to cancel the fetcher, including the interruption usually done on the task thread
+        // once it has finished, there must be no more thread blocked on the source context
+        fetcher.cancel();
+        fetcherRunner.interrupt();
+        fetcherRunner.join();
+
+        assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+        private final ReentrantLock lock = new ReentrantLock();
+        private final OneShotLatch inBlocking = new OneShotLatch();
+
+        @Override
+        public void collect(T element) {
+            block();
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            block();
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            block();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return new Object();
+        }
+
+        @Override
+        public void close() {}
+
+        public void waitTillHasBlocker() throws InterruptedException {
+            inBlocking.await();
+        }
+
+        public boolean isStillBlocking() {
+            return lock.isLocked();
+        }
+
+        @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+        private void block() {
+            lock.lock();
+            try {
+                inBlocking.trigger();
+
+                // put this thread to sleep indefinitely
+                final Object o = new Object();
+                while (true) {
+                    synchronized (o) {
+                        o.wait();
+                    }
+                }
+            }
+            catch (InterruptedException e) {
+                // exit cleanly, simply reset the interruption flag
+                Thread.currentThread().interrupt();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
deleted file mode 100644
index 1d36198..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
-
-	@Test(timeout=60000)
-	public void testAutoOffsetReset() throws Exception {
-		runAutoOffsetResetTest();
-	}
-
-	@Test(timeout=60000)
-	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNone();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
new file mode 100644
index 0000000..e6e3c51
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a
+ * "size one blocking queue", with some extras around exception reporting, closing, and
+ * waking up thread without {@link Thread#interrupt() interrupting} threads.
+ * 
+ * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between
+ * the thread that runs the KafkaConsumer class and the main thread.
+ * 
+ * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException}
+ * rather than a thread interrupt.
+ * 
+ * <p>The Handover can also be "closed", signalling from one thread to the other that it
+ * the thread has terminated.
+ */
+@ThreadSafe
+public final class Handover implements Closeable {
+
+	private final Object lock = new Object();
+
+	private ConsumerRecords<byte[], byte[]> next;
+	private Throwable error;
+	private boolean wakeupProducer;
+
+	/**
+	 * Polls the next element from the Handover, possibly blocking until the next element is
+	 * available. This method behaves similar to polling from a blocking queue.
+	 * 
+	 * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then
+	 * that exception is thrown rather than an element being returned.
+	 * 
+	 * @return The next element (buffer of records, never null).
+	 * 
+	 * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+	 * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+	 */
+	@Nonnull
+	public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
+		synchronized (lock) {
+			while (next == null && error == null) {
+				lock.wait();
+			}
+
+			ConsumerRecords<byte[], byte[]> n = next;
+			if (n != null) {
+				next = null;
+				lock.notifyAll();
+				return n;
+			}
+			else {
+				ExceptionUtils.rethrowException(error, error.getMessage());
+
+				// this statement cannot be reached since the above method always throws an exception
+				// this is only here to silence the compiler and any warnings
+				return ConsumerRecords.empty(); 
+			}
+		}
+	}
+
+	/**
+	 * Hands over an element from the producer. If the Handover already has an element that was
+	 * not yet picked up by the consumer thread, this call blocks until the consumer picks up that
+	 * previous element.
+	 * 
+	 * <p>This behavior is similar to a "size one" blocking queue.
+	 * 
+	 * @param element The next element to hand over.
+	 * 
+	 * @throws InterruptedException
+	 *                 Thrown, if the thread is interrupted while blocking for the Handover to be empty.
+	 * @throws WakeupException
+	 *                 Thrown, if the {@link #wakeupProducer()} method is called while blocking for
+	 *                 the Handover to be empty.
+	 * @throws ClosedException
+	 *                 Thrown if the Handover was closed or concurrently being closed.
+	 */
+	public void produce(final ConsumerRecords<byte[], byte[]> element)
+			throws InterruptedException, WakeupException, ClosedException {
+
+		checkNotNull(element);
+
+		synchronized (lock) {
+			while (next != null && !wakeupProducer) {
+				lock.wait();
+			}
+
+			wakeupProducer = false;
+
+			// if there is still an element, we must have been woken up
+			if (next != null) {
+				throw new WakeupException();
+			}
+			// if there is no error, then this is open and can accept this element
+			else if (error == null) {
+				next = element;
+				lock.notifyAll();
+			}
+			// an error marks this as closed for the producer
+			else {
+				throw new ClosedException();
+			}
+		}
+	}
+
+	/**
+	 * Reports an exception. The consumer will throw the given exception immediately, if
+	 * it is currently blocked in the {@link #pollNext()} method, or the next time it
+	 * calls that method.
+	 * 
+	 * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)}
+	 * or {@link #pollNext()} will ever return regularly any more, but will always return
+	 * exceptionally.
+	 * 
+	 * <p>If another exception was already reported, this method does nothing.
+	 * 
+	 * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
+	 * 
+	 * @param t The exception to report.
+	 */
+	public void reportError(Throwable t) {
+		checkNotNull(t);
+
+		synchronized (lock) {
+			// do not override the initial exception
+			if (error == null) {
+				error = t;
+			}
+			next = null;
+			lock.notifyAll();
+		}
+	}
+
+	/**
+	 * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the
+	 * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and
+	 * future invocations.
+	 * 
+	 * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
+	 * that exception will not be overridden. The consumer thread will throw that exception upon
+	 * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+	 */
+	@Override
+	public void close() {
+		synchronized (lock) {
+			next = null;
+			wakeupProducer = false;
+
+			if (error == null) {
+				error = new ClosedException();
+			}
+			lock.notifyAll();
+		}
+	}
+
+	/**
+	 * Wakes the producer thread up. If the producer thread is currently blocked in
+	 * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing
+	 * a {@link WakeupException}.
+	 */
+	public void wakeupProducer() {
+		synchronized (lock) {
+			wakeupProducer = true;
+			lock.notifyAll();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An exception thrown by the Handover in the {@link #pollNext()} or
+	 * {@link #produce(ConsumerRecords)} method, after the Handover was closed via
+	 * {@link #close()}.
+	 */
+	public static final class ClosedException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+
+	/**
+	 * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)}
+	 * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}.
+	 */
+	public static final class WakeupException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index acdcb61..d495327 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,10 +23,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -34,30 +32,23 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
  * 
  * @param <T> The type of elements produced by the fetcher.
  */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
 
@@ -66,36 +57,15 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** The schema to convert between Kafka's byte messages, and Flink's objects */
 	private final KeyedDeserializationSchema<T> deserializer;
 
-	/** The configuration for the Kafka consumer */
-	private final Properties kafkaProperties;
+	/** The handover of data and exceptions between the consumer thread and the task thread */
+	private final Handover handover;
 
-	/** The maximum number of milliseconds to wait for a fetch batch */
-	private final long pollTimeout;
-
-	/** The next offsets that the main thread should commit */
-	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
-	
-	/** The callback invoked by Kafka once an offset commit is complete */
-	private final OffsetCommitCallback offsetCommitCallback;
-
-	/** Reference to the Kafka consumer, once it is created */
-	private volatile KafkaConsumer<byte[], byte[]> consumer;
-	
-	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
-	private volatile ExceptionProxy errorHandler;
+	/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
+	private final KafkaConsumerThread consumerThread;
 
 	/** Flag to mark the main work loop as alive */
 	private volatile boolean running = true;
 
-	/** Flag tracking whether the latest commit request has completed */
-	private volatile boolean commitInProgress;
-
-	/** For Debug output **/
-	private String taskNameWithSubtasks;
-
-	/** We get this from the outside to publish metrics. **/
-	private MetricGroup metricGroup;
-
 	// ------------------------------------------------------------------------
 
 	public Kafka09Fetcher(
@@ -125,16 +95,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 				useMetrics);
 
 		this.deserializer = deserializer;
-		this.kafkaProperties = kafkaProperties;
-		this.pollTimeout = pollTimeout;
-		this.nextOffsetsToCommit = new AtomicReference<>();
-		this.offsetCommitCallback = new CommitCallback();
-		this.taskNameWithSubtasks = taskNameWithSubtasks;
-		this.metricGroup = metricGroup;
+		this.handover = new Handover();
+
+		final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
+		addOffsetStateGauge(kafkaMetricGroup);
 
 		// if checkpointing is enabled, we are not automatically committing to Kafka.
-		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+		kafkaProperties.setProperty(
+				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
 				Boolean.toString(!enableCheckpointing));
+		
+		this.consumerThread = new KafkaConsumerThread(
+				LOG,
+				handover,
+				kafkaProperties,
+				subscribedPartitions(),
+				kafkaMetricGroup,
+				createCallBridge(),
+				getFetcherName() + " for " + taskNameWithSubtasks,
+				pollTimeout,
+				useMetrics);
 	}
 
 	// ------------------------------------------------------------------------
@@ -143,133 +123,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 	@Override
 	public void runFetchLoop() throws Exception {
-		this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
-		// rather than running the main fetch loop directly here, we spawn a dedicated thread
-		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
-		Thread runner = new Thread(this, getFetcherName() + " for " + taskNameWithSubtasks);
-		runner.setDaemon(true);
-		runner.start();
-
 		try {
-			runner.join();
-		} catch (InterruptedException e) {
-			// may be the result of a wake-up after an exception. we ignore this here and only
-			// restore the interruption state
-			Thread.currentThread().interrupt();
-		}
-
-		// make sure we propagate any exception that occurred in the concurrent fetch thread,
-		// before leaving this method
-		this.errorHandler.checkAndThrowException();
-	}
-
-	@Override
-	public void cancel() {
-		// flag the main thread to exit
-		running = false;
-
-		// NOTE:
-		//   - We cannot interrupt the runner thread, because the Kafka consumer may
-		//     deadlock when the thread is interrupted while in certain methods
-		//   - We cannot call close() on the consumer, because it will actually throw
-		//     an exception if a concurrent call is in progress
-
-		// make sure the consumer finds out faster that we are shutting down 
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	@Override
-	public void run() {
-		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
-		// This is important, because the consumer has multi-threading issues,
-		// including concurrent 'close()' calls.
-
-		final KafkaConsumer<byte[], byte[]> consumer;
-		try {
-			consumer = new KafkaConsumer<>(kafkaProperties);
-		}
-		catch (Throwable t) {
-			running = false;
-			errorHandler.reportError(t);
-			return;
-		}
-
-		// from here on, the consumer will be closed properly
-		try {
-			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
-
-			if (useMetrics) {
-				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
-				addOffsetStateGauge(kafkaMetricGroup);
-				// register Kafka metrics to Flink
-				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
-				if (metrics == null) {
-					// MapR's Kafka implementation returns null here.
-					LOG.info("Consumer implementation does not support metrics");
-				} else {
-					// we have Kafka metrics, register them
-					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
-					}
-				}
-			}
-
-			// seek the consumer to the initial offsets
-			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-				if (partition.isOffsetDefined()) {
-					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
-						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
-					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-				} else {
-					// for partitions that do not have offsets restored from a checkpoint/savepoint,
-					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
-					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
-
-					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
-
-					LOG.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset " +
-						"will be set to {}", partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
-					// the fetched offset represents the next record to process, so we need to subtract it by 1
-					partition.setOffset(fetchedOffset - 1);
-				}
-			}
+			final Handover handover = this.handover;
 
-			// from now on, external operations may call the consumer
-			this.consumer = consumer;
+			// kick off the actual Kafka consumer
+			consumerThread.start();
 
-			// main fetch loop
 			while (running) {
-
-				// check if there is something to commit
-				final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
-				if (toCommit != null && !commitInProgress) {
-					// reset the work-to-be committed, so we don't repeatedly commit the same
-					// also record that a commit is already in progress
-					commitInProgress = true;
-					consumer.commitAsync(toCommit, offsetCommitCallback);
-				}
-
-				// get the next batch of records
-				final ConsumerRecords<byte[], byte[]> records;
-				try {
-					records = consumer.poll(pollTimeout);
-				}
-				catch (WakeupException we) {
-					continue;
-				}
+				// this blocks until we get the next records
+				// it automatically re-throws exceptions encountered in the fetcher thread
+				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
 
 				// get the records for each topic partition
 				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-					
-					List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
+							records.records(partition.getKafkaPartitionHandle());
 
 					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-						T value = deserializer.deserialize(
+
+						final T value = deserializer.deserialize(
 								record.key(), record.value(),
 								record.topic(), record.partition(), record.offset());
 
@@ -279,32 +152,37 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 							break;
 						}
 
-						// emit the actual record. this also update offset state atomically
+						// emit the actual record. this also updates offset state atomically
 						// and deals with timestamps and watermark generation
 						emitRecord(value, partition, record.offset(), record);
 					}
 				}
 			}
-			// end main fetch loop
-		}
-		catch (Throwable t) {
-			if (running) {
-				running = false;
-				errorHandler.reportError(t);
-			} else {
-				LOG.debug("Stopped ConsumerThread threw exception", t);
-			}
 		}
 		finally {
-			try {
-				consumer.close();
-			}
-			catch (Throwable t) {
-				LOG.warn("Error while closing Kafka 0.9 consumer", t);
-			}
+			// this signals the consumer thread that no more work is to be done
+			consumerThread.shutdown();
+		}
+
+		// on a clean exit, wait for the runner thread
+		try {
+			consumerThread.join();
+		}
+		catch (InterruptedException e) {
+			// may be the result of a wake-up interruption after an exception.
+			// we ignore this here and only restore the interruption state
+			Thread.currentThread().interrupt();
 		}
 	}
 
+	@Override
+	public void cancel() {
+		// flag the main thread to exit. A thread interrupt will come anyways.
+		running = false;
+		handover.close();
+		consumerThread.shutdown();
+	}
+
 	// ------------------------------------------------------------------------
 	//  The below methods are overridden in the 0.10 fetcher, which otherwise
 	//   reuses most of the 0.9 fetcher behavior
@@ -320,14 +198,17 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		emitRecord(record, partition, offset);
 	}
 
-	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
-	}
-
+	/**
+	 * Gets the name of this fetcher, for thread naming and logging purposes.
+	 */
 	protected String getFetcherName() {
 		return "Kafka 0.9 Fetcher";
 	}
 
+	protected KafkaConsumerCallBridge createCallBridge() {
+		return new KafkaConsumerCallBridge();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Implement Methods of the AbstractFetcher
 	// ------------------------------------------------------------------------
@@ -355,37 +236,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
-			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
-					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
-					"This does not compromise Flink's checkpoint integrity.");
-		}
-		if (consumer != null) {
-			consumer.wakeup();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
-		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-			result.add(p.getKafkaPartitionHandle());
-		}
-		return result;
-	}
-
-	private class CommitCallback implements OffsetCommitCallback {
-
-		@Override
-		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
-			commitInProgress = false;
-
-			if (ex != null) {
-				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
-			}
-		}
+		consumerThread.setOffsetsToCommit(offsetsToCommit);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes against different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+	public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+		consumer.assign(topicPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
+ * deserialize and emit the records.
+ * 
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and to even
+ * deadlock in certain situations.
+ * 
+ * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+	/** Logger for this consumer */
+	private final Logger log;
+
+	/** The handover of data and exceptions between the consumer thread and the task thread */
+	private final Handover handover;
+
+	/** The next offsets that the main thread should commit */
+	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+
+	/** The configuration for the Kafka consumer */
+	private final Properties kafkaProperties;
+
+	/** The partitions that this consumer reads from */ 
+	private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+
+	/** We get this from the outside to publish metrics. **/
+	private final MetricGroup kafkaMetricGroup;
+
+	/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
+	private final KafkaConsumerCallBridge consumerCallBridge;
+
+	/** The maximum number of milliseconds to wait for a fetch batch */
+	private final long pollTimeout;
+
+	/** Flag whether to add Kafka's metrics to the Flink metrics */
+	private final boolean useMetrics;
+
+	/** Reference to the Kafka consumer, once it is created */
+	private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+	/** Flag to mark the main work loop as alive */
+	private volatile boolean running;
+
+	/** Flag tracking whether the latest commit request has completed */
+	private volatile boolean commitInProgress;
+
+
+	public KafkaConsumerThread(
+			Logger log,
+			Handover handover,
+			Properties kafkaProperties,
+			KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+			MetricGroup kafkaMetricGroup,
+			KafkaConsumerCallBridge consumerCallBridge,
+			String threadName,
+			long pollTimeout,
+			boolean useMetrics) {
+
+		super(threadName);
+		setDaemon(true);
+
+		this.log = checkNotNull(log);
+		this.handover = checkNotNull(handover);
+		this.kafkaProperties = checkNotNull(kafkaProperties);
+		this.subscribedPartitions = checkNotNull(subscribedPartitions);
+		this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+		this.consumerCallBridge = checkNotNull(consumerCallBridge);
+		this.pollTimeout = pollTimeout;
+		this.useMetrics = useMetrics;
+
+		this.nextOffsetsToCommit = new AtomicReference<>();
+		this.running = true;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void run() {
+		// early exit check
+		if (!running) {
+			return;
+		}
+
+		// this is the means to talk to FlinkKafkaConsumer's main thread
+		final Handover handover = this.handover;
+
+		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
+		// This is important, because the consumer has multi-threading issues,
+		// including concurrent 'close()' calls.
+		final KafkaConsumer<byte[], byte[]> consumer;
+		try {
+			consumer = new KafkaConsumer<>(kafkaProperties);
+		}
+		catch (Throwable t) {
+			handover.reportError(t);
+			return;
+		}
+
+		// from here on, the consumer is guaranteed to be closed properly
+		try {
+			// The callback invoked by Kafka once an offset commit is complete
+			final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
+
+			// tell the consumer which partitions to work with
+			consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+
+			// register Kafka's very own metrics in Flink's metric reporters
+			if (useMetrics) {
+				// register Kafka metrics to Flink
+				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+				if (metrics == null) {
+					// MapR's Kafka implementation returns null here.
+					log.info("Consumer implementation does not support metrics");
+				} else {
+					// we have Kafka metrics, register them
+					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+					}
+				}
+			}
+
+			// early exit check
+			if (!running) {
+				return;
+			}
+
+			// seek the consumer to the initial offsets
+			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
+				if (partition.isOffsetDefined()) {
+					log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
+							"seeking the consumer to position {}",
+							partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
+
+					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+				}
+				else {
+					// for partitions that do not have offsets restored from a checkpoint/savepoint,
+					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
+					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
+
+					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
+
+					log.info("Partition {} has no initial offset; the consumer has position {}, " +
+							"so the initial offset will be set to {}",
+							partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+					// the fetched offset represents the next record to process, so we need to subtract it by 1
+					partition.setOffset(fetchedOffset - 1);
+				}
+			}
+
+			// from now on, external operations may call the consumer
+			this.consumer = consumer;
+
+			// the latest bulk of records. may carry across the loop if the thread is woken up
+			// from blocking on the handover
+			ConsumerRecords<byte[], byte[]> records = null;
+
+			// main fetch loop
+			while (running) {
+
+				// check if there is something to commit
+				if (!commitInProgress) {
+					// get and reset the work-to-be committed, so we don't repeatedly commit the same
+					final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+					if (toCommit != null) {
+						log.debug("Sending async offset commit request to Kafka broker");
+
+						// also record that a commit is already in progress
+						// the order here matters! first set the flag, then send the commit command.
+						commitInProgress = true;
+						consumer.commitAsync(toCommit, offsetCommitCallback);
+					}
+				}
+
+				// get the next batch of records, unless we did not manage to hand the old batch over
+				if (records == null) {
+					try {
+						records = consumer.poll(pollTimeout);
+					}
+					catch (WakeupException we) {
+						continue;
+					}
+				}
+
+				try {
+					handover.produce(records);
+					records = null;
+				}
+				catch (Handover.WakeupException e) {
+					// fall through the loop
+				}
+			}
+			// end main fetch loop
+		}
+		catch (Throwable t) {
+			// let the main thread know and exit
+			// it may be that this exception comes because the main thread closed the handover, in
+			// which case the below reporting is irrelevant, but does not hurt either
+			handover.reportError(t);
+		}
+		finally {
+			// make sure the handover is closed if it is not already closed or has an error
+			handover.close();
+
+			// make sure the KafkaConsumer is closed
+			try {
+				consumer.close();
+			}
+			catch (Throwable t) {
+				log.warn("Error while closing Kafka consumer", t);
+			}
+		}
+	}
+
+	/**
+	 * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
+	 */
+	public void shutdown() {
+		running = false;
+
+		// We cannot call close() on the KafkaConsumer, because it will actually throw
+		// an exception if a concurrent call is in progress
+
+		// this wakes up the consumer if it is blocked handing over records
+		handover.wakeupProducer();
+
+		// this wakes up the consumer if it is blocked in a kafka poll 
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 * 
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded  by newer ones.
+	 * 
+	 * @param offsetsToCommit The offsets to commit
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		// record the work to be committed by the main consumer thread and make sure the consumer notices that
+		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+					"This does not compromise Flink's checkpoint integrity.");
+		}
+
+		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
+		handover.wakeupProducer();
+		if (consumer != null) {
+			consumer.wakeup();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+			result.add(p.getKafkaPartitionHandle());
+		}
+		return result;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class CommitCallback implements OffsetCommitCallback {
+
+		@Override
+		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+			commitInProgress = false;
+
+			if (ex != null) {
+				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 1162599..7a82365 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -44,6 +48,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -53,6 +58,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -68,7 +74,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka09Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka09Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka09FetcherTest {
 
 	@Test
@@ -124,7 +130,7 @@ public class Kafka09FetcherTest {
 				this.getClass().getClassLoader(),
 				true, /* checkpointing */
 				"task_name",
-				mock(MetricGroup.class),
+				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
@@ -174,7 +180,7 @@ public class Kafka09FetcherTest {
 
 		// check that there were no errors in the fetcher
 		final Throwable fetcherError = error.get();
-		if (fetcherError != null) {
+		if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
 			throw new Exception("Exception in the fetcher", fetcherError);
 		}
 		final Throwable committerError = commitError.get();
@@ -260,7 +266,7 @@ public class Kafka09FetcherTest {
 				this.getClass().getClassLoader(),
 				true, /* checkpointing */
 				"task_name",
-				mock(MetricGroup.class),
+				new UnregisteredMetricsGroup(),
 				schema,
 				new Properties(),
 				0L,
@@ -323,8 +329,154 @@ public class Kafka09FetcherTest {
 
 		// check that there were no errors in the fetcher
 		final Throwable caughtError = error.get();
-		if (caughtError != null) {
+		if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
 			throw new Exception("Exception in the fetcher", caughtError);
 		}
 	}
+
+	@Test
+	public void testCancellationWhenEmitBlocks() throws Exception {
+
+		// ----- some test data -----
+
+		final String topic = "test-topic";
+		final int partition = 3;
+		final byte[] payload = new byte[] {1, 2, 3, 4};
+
+		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+		data.put(new TopicPartition(topic, partition), records);
+
+		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+		// ----- the test consumer -----
+
+		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+			@Override
+			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+				return consumerRecords;
+			}
+		});
+
+		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+		// ----- build a fetcher -----
+
+		BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				new UnregisteredMetricsGroup(),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
+
+		// ----- run the fetcher -----
+
+		final AtomicReference<Throwable> error = new AtomicReference<>();
+		final Thread fetcherRunner = new Thread("fetcher runner") {
+
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetchLoop();
+				} catch (Throwable t) {
+					error.set(t);
+				}
+			}
+		};
+		fetcherRunner.start();
+
+		// wait until the thread started to emit records to the source context
+		sourceContext.waitTillHasBlocker();
+
+		// now we try to cancel the fetcher, including the interruption usually done on the task thread
+		// once it has finished, there must be no more thread blocked on the source context
+		fetcher.cancel();
+		fetcherRunner.interrupt();
+		fetcherRunner.join();
+
+		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utilities
+	// ------------------------------------------------------------------------
+
+	private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+		private final ReentrantLock lock = new ReentrantLock();
+		private final OneShotLatch inBlocking = new OneShotLatch();
+
+		@Override
+		public void collect(T element) {
+			block();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			block();
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			block();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return new Object();
+		}
+
+		@Override
+		public void close() {}
+
+		public void waitTillHasBlocker() throws InterruptedException {
+			inBlocking.await();
+		}
+
+		public boolean isStillBlocking() {
+			return lock.isLocked();
+		}
+
+		@SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+		private void block() {
+			lock.lock();
+			try {
+				inBlocking.trigger();
+
+				// put this thread to sleep indefinitely
+				final Object o = new Object();
+				while (true) {
+					synchronized (o) {
+						o.wait();
+					}
+				}
+			}
+			catch (InterruptedException e) {
+				// exit cleanly, simply reset the interruption flag
+				Thread.currentThread().interrupt();
+			}
+			finally {
+				lock.unlock();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
deleted file mode 100644
index c1b21b7..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
-
-	@Test(timeout=60000)
-	public void testAutoOffsetReset() throws Exception {
-		runAutoOffsetResetTest();
-	}
-
-	@Test(timeout=60000)
-	public void testAutoOffsetResetNone() throws Exception {
-		runFailOnAutoOffsetResetNone();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. 
+ */
+public class HandoverTest {
+
+	// ------------------------------------------------------------------------
+	//  test produce / consumer
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWithVariableProducer() throws Exception {
+		runProducerConsumerTest(500, 2, 0);
+	}
+
+	@Test
+	public void testWithVariableConsumer() throws Exception {
+		runProducerConsumerTest(500, 0, 2);
+	}
+
+	@Test
+	public void testWithVariableBoth() throws Exception {
+		runProducerConsumerTest(500, 2, 2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test error propagation
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPublishErrorOnEmptyHandover() throws Exception {
+		final Handover handover = new Handover();
+
+		Exception error = new Exception();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testPublishErrorOnFullHandover() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		IOException error = new IOException();
+		handover.reportError(error);
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Exception e) {
+			assertEquals(error, e);
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnEmpty() throws Exception {
+		final Handover handover = new Handover();
+
+		IllegalStateException error = new IllegalStateException();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testExceptionMarksClosedOnFull() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		LinkageError error = new LinkageError();
+		handover.reportError(error);
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test closing behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCloseEmptyForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForConsumer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.pollNext();
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseEmptyForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testCloseFullForProducer() throws Exception {
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+		handover.close();
+
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.ClosedException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test wake up behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+		Handover handover = new Handover();
+		handover.wakeupProducer();
+
+		// produce into a woken but empty handover
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+
+		// handover now has records, next time we wakeup and produce it needs
+		// to throw an exception
+		handover.wakeupProducer();
+		try {
+			handover.produce(createTestRecords());
+			fail("should throw an exception");
+		}
+		catch (Handover.WakeupException e) {
+			// expected
+		}
+
+		// empty the handover
+		assertNotNull(handover.pollNext());
+		
+		// producing into an empty handover should work
+		try {
+			handover.produce(createTestRecords());
+		}
+		catch (Handover.WakeupException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testWakeupWakesOnlyOnce() throws Exception {
+		// create a full handover
+		final Handover handover = new Handover();
+		handover.produce(createTestRecords());
+
+		handover.wakeupProducer();
+
+		try {
+			handover.produce(createTestRecords());
+			fail();
+		} catch (WakeupException e) {
+			// expected
+		}
+
+		CheckedThread producer = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				handover.produce(createTestRecords());
+			}
+		};
+		producer.start();
+
+		// the producer must go blocking
+		producer.waitUntilThreadHoldsLock(10000);
+
+		// release the thread by consuming something
+		assertNotNull(handover.pollNext());
+		producer.sync();
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
+		// generate test data
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
+		for (int i = 0; i < numRecords; i++) {
+			data[i] = createTestRecords();
+		}
+
+		final Handover handover = new Handover();
+
+		ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
+		ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
+
+		consumer.start();
+		producer.start();
+
+		// sync first on the consumer, so it propagates assertion errors
+		consumer.sync();
+		producer.sync();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+		return mock(ConsumerRecords.class);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static abstract class CheckedThread extends Thread {
+
+		private volatile Throwable error;
+
+		public abstract void go() throws Exception;
+
+		@Override
+		public void run() {
+			try {
+				go();
+			}
+			catch (Throwable t) {
+				error = t;
+			}
+		}
+
+		public void sync() throws Exception {
+			join();
+			if (error != null) {
+				ExceptionUtils.rethrowException(error, error.getMessage());
+			}
+		}
+
+		public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
+			final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+			
+			while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
+				Thread.sleep(1);
+			}
+
+			if (!isBlockedOrWaiting()) {
+				throw new TimeoutException();
+			}
+		}
+
+		private boolean isBlockedOrWaiting() {
+			State state = getState();
+			return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
+		}
+	}
+
+	private static class ProducerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				handover.produce(rec);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+
+	private static class ConsumerThread extends CheckedThread {
+
+		private final Random rnd = new Random();
+		private final Handover handover;
+		private final ConsumerRecords<byte[], byte[]>[] data;
+		private final int maxDelay;
+
+		private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+			this.handover = handover;
+			this.data = data;
+			this.maxDelay = maxDelay;
+		}
+
+		@Override
+		public void go() throws Exception {
+			for (ConsumerRecords<byte[], byte[]> rec : data) {
+				ConsumerRecords<byte[], byte[]> next = handover.pollNext();
+
+				assertEquals(rec, next);
+
+				if (maxDelay > 0) {
+					int delay = rnd.nextInt(maxDelay);
+					Thread.sleep(delay);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 5c03b78..dccf698 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -122,6 +122,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	 *
 	 */
 	private static boolean stopProducer = false;
+
 	public void runAutoOffsetResetTest() throws Exception {
 		final String topic = "auto-offset-reset-test";
 


[3/5] flink git commit: [hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10

Posted by se...@apache.org.
[hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka 0.9/0.10


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa1864c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa1864c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa1864c7

Branch: refs/heads/master
Commit: fa1864c7a6eadea55eb2d7e8fd2b72e043841671
Parents: 611412c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 9 17:58:54 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  6 ++
 .../kafka/internal/Kafka010Fetcher.java         | 39 +++++--------
 .../connectors/kafka/Kafka010FetcherTest.java   |  1 -
 .../kafka/internals/SimpleConsumerThread.java   |  2 +-
 .../kafka/internal/Kafka09Fetcher.java          | 25 +++++---
 .../kafka/internals/AbstractFetcher.java        | 60 ++++++++++++++------
 .../AbstractFetcherTimestampsTest.java          | 53 +++++++++--------
 7 files changed, 107 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 8108afc..04019f8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -48,6 +48,12 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
 			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- Add Kafka 0.10.x as a dependency -->

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 4a1f5f6..024cd38 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -38,6 +38,9 @@ import java.util.Properties;
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
  * 
+ * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
+ * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
+ * 
  * @param <T> The type of elements produced by the fetcher.
  */
 public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -76,37 +79,23 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 	}
 
 	@Override
-	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
-		consumer.assign(topicPartitions);
-	}
+	protected void emitRecord(
+			T record,
+			KafkaTopicPartitionState<TopicPartition> partition,
+			long offset,
+			ConsumerRecord<?, ?> consumerRecord) throws Exception {
 
-	@Override
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
-		// get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x)
-		super.emitRecord(record, partition, offset, consumerRecord.timestamp());
+		// we attach the Kafka 0.10 timestamp here
+		emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
 	}
 
 	/**
-	 * Emit record Kafka-timestamp aware.
+	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
+	 * changing the List in the signature to a Collection.
 	 */
 	@Override
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks
-
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
-			synchronized (checkpointLock) {
-				sourceContext.collectWithTimestamp(record, timestamp);
-				partitionState.setOffset(offset);
-			}
-		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
-		}
+	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
+		consumer.assign(topicPartitions);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 718db48..037d25b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -114,7 +114,6 @@ public class Kafka010FetcherTest {
         SourceContext<String> sourceContext = mock(SourceContext.class);
         List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
         KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
                 sourceContext,

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 1302348..35e491a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread {
 								continue partitionsLoop;
 							}
 							
-							owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE);
+							owner.emitRecord(value, currentPartition, offset);
 						}
 						else {
 							// no longer running

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index a8c0397..acdcb61 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -201,7 +201,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		try {
 			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));
 
-
 			if (useMetrics) {
 				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
 				addOffsetStateGauge(kafkaMetricGroup);
@@ -306,14 +305,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 		}
 	}
 
-	// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
-	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
-		emitRecord(record, partition, offset, Long.MIN_VALUE);
+	// ------------------------------------------------------------------------
+	//  The below methods are overridden in the 0.10 fetcher, which otherwise
+	//   reuses most of the 0.9 fetcher behavior
+	// ------------------------------------------------------------------------
+
+	protected void emitRecord(
+			T record,
+			KafkaTopicPartitionState<TopicPartition> partition,
+			long offset,
+			@SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
+
+		// the 0.9 Fetcher does not try to extract a timestamp
+		emitRecord(record, partition, offset);
 	}
-	/**
-	 * Protected method to make the partition assignment pluggable, for different Kafka versions.
-	 */
-	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
+
+	protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) {
 		consumer.assign(topicPartitions);
 	}
 
@@ -322,7 +329,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	}
 
 	// ------------------------------------------------------------------------
-	//  Kafka 0.9 specific fetcher behavior
+	//  Implement Methods of the AbstractFetcher
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3350b06..cf39606 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -205,32 +205,60 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  emitting records
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Emits a record without attaching an existing timestamp to it.
+	 * 
 	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
 	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 * 
 	 * @param record The record to emit
 	 * @param partitionState The state of the Kafka partition from which the record was fetched
 	 * @param offset The offset of the record
-	 * @param timestamp The record's event-timestamp
 	 */
-	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
 		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
 			// fast path logic, in case there are no watermarks
 
 			// emit the record, using the checkpoint lock to guarantee
 			// atomicity of record emission and offset state update
 			synchronized (checkpointLock) {
-				if(timestamp != Long.MIN_VALUE) {
-					// this case is true for Kafka 0.10
-					sourceContext.collectWithTimestamp(record, timestamp);
-				} else {
-					sourceContext.collect(record);
-				}
+				sourceContext.collect(record);
+				partitionState.setOffset(offset);
+			}
+		}
+		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+		else {
+			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
+		}
+	}
+
+	/**
+	 * Emits a record attaching a timestamp to it.
+	 *
+	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
+	 * That makes the fast path efficient, the extended paths are called as separate methods.
+	 *
+	 * @param record The record to emit
+	 * @param partitionState The state of the Kafka partition from which the record was fetched
+	 * @param offset The offset of the record
+	 */
+	protected void emitRecordWithTimestamp(
+			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
+
+		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+			// fast path logic, in case there are no watermarks generated in the fetcher
+
+			// emit the record, using the checkpoint lock to guarantee
+			// atomicity of record emission and offset state update
+			synchronized (checkpointLock) {
+				sourceContext.collectWithTimestamp(record, timestamp);
 				partitionState.setOffset(offset);
 			}
 		}
@@ -285,14 +313,14 @@ public abstract class AbstractFetcher<T, KPH> {
 		// from the punctuated extractor
 		final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
 		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-			
+
 		// emit the record with timestamp, using the usual checkpoint lock to guarantee
 		// atomicity of record emission and offset state update 
 		synchronized (checkpointLock) {
 			sourceContext.collectWithTimestamp(record, timestamp);
 			partitionState.setOffset(offset);
 		}
-		
+
 		// if we also have a new per-partition watermark, check if that is also a
 		// new cross-partition watermark
 		if (newWatermark != null) {
@@ -306,7 +334,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
 		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
 			long newMin = Long.MAX_VALUE;
-			
+
 			for (KafkaTopicPartitionState<?> state : allPartitions) {
 				@SuppressWarnings("unchecked")
 				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
@@ -314,7 +342,7 @@ public abstract class AbstractFetcher<T, KPH> {
 				
 				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
 			}
-			
+
 			// double-check locking pattern
 			if (newMin > maxWatermarkSoFar) {
 				synchronized (checkpointLock) {
@@ -416,7 +444,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		// add current offsets to gage
 		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
 		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
-		for(KafkaTopicPartitionState ktp: subscribedPartitions()){
+		for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
 			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
 			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
 		}
@@ -435,10 +463,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 */
 	private static class OffsetGauge implements Gauge<Long> {
 
-		private final KafkaTopicPartitionState ktp;
+		private final KafkaTopicPartitionState<?> ktp;
 		private final OffsetGaugeType gaugeType;
 
-		public OffsetGauge(KafkaTopicPartitionState ktp, OffsetGaugeType gaugeType) {
+		public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
 			this.ktp = ktp;
 			this.gaugeType = gaugeType;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa1864c7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 5801c24..0b3507a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -67,22 +66,22 @@ public class AbstractFetcherTimestampsTest {
 		// elements generate a watermark if the timestamp is a multiple of three
 		
 		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
 		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
 		assertFalse(sourceContext.hasWatermark());
 
 		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(12L, part2, 1L);
 		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
 		assertFalse(sourceContext.hasWatermark());
 
 		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
 		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
 		
@@ -91,25 +90,25 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
 		
 		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-		fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
 		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
 
 		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+		fetcher.emitRecord(30L, part1, 4L);
 		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(13L, part2, 2L);
 		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(14L, part2, 3L);
 		assertFalse(sourceContext.hasWatermark());
-		fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(15L, part2, 3L);
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
 	}
@@ -141,20 +140,20 @@ public class AbstractFetcherTimestampsTest {
 		// elements generate a watermark if the timestamp is a multiple of three
 
 		// elements for partition 1
-		fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
 		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
 
 		// elements for partition 2
-		fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(12L, part2, 1L);
 		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
 
 		// elements for partition 3
-		fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-		fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
 		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
 
@@ -164,14 +163,14 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 3
-		fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-		fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
 		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
 
 		// advance partition 1 beyond partition 2 - this bumps the watermark
-		fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+		fetcher.emitRecord(30L, part1, 4L);
 		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
 
@@ -181,9 +180,9 @@ public class AbstractFetcherTimestampsTest {
 		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
 
 		// advance partition 2 again - this bumps the watermark
-		fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
-		fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
-		fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(13L, part2, 2L);
+		fetcher.emitRecord(14L, part2, 3L);
+		fetcher.emitRecord(15L, part2, 3L);
 
 		processingTimeService.setCurrentTime(30);
 		// this blocks until the periodic thread emitted the watermark