You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/11/22 15:51:44 UTC

flink git commit: [FLINK-4155] [kafka] Move partition list fetching to open() for Kafka producers

Repository: flink
Updated Branches:
  refs/heads/master a42eafb40 -> 1f9c0cf85


[FLINK-4155] [kafka] Move partition list fetching  to open() for Kafka producers

The fetched partition list from Kafka in open() is sorted by partition id
so that subtasks will have the same list across failures. To compensate the original
use of the KafkaProducer instantiation in the constructor to eagerly
ensure that required producer configs are provided, we check that at least
the bootstrap servers are set.

This change also includes refactoring of AtLeastOnceProducerTest for a more
complete suite of tests on FlinkKafkaProducerBase.

This closes #2681.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f9c0cf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f9c0cf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f9c0cf8

Branch: refs/heads/master
Commit: 1f9c0cf8522481c1a1007d98d90b30baff5c18ca
Parents: a42eafb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Oct 21 14:23:58 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Nov 22 23:50:25 2016 +0800

----------------------------------------------------------------------
 .../kafka/Kafka08JsonTableSinkTest.java         |   4 +-
 .../connectors/kafka/KafkaProducerTest.java     |  11 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   4 +-
 .../connectors/kafka/KafkaProducerTest.java     |   9 +-
 .../kafka/FlinkKafkaProducerBase.java           |  42 +--
 .../kafka/AtLeastOnceProducerTest.java          | 231 ---------------
 .../kafka/FlinkKafkaProducerBaseTest.java       | 288 +++++++++++++++++++
 .../kafka/KafkaTableSinkTestBase.java           |  16 +-
 .../testutils/FakeStandardProducerConfig.java   |  34 +++
 9 files changed, 374 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 446e1d7..6d0b140 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -41,8 +41,8 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	protected Class<SerializationSchema<Row>> getSerializationSchema() {
-		return (Class) JsonRowSerializationSchema.class;
+	protected SerializationSchema<Row> getSerializationSchema() {
+		return new JsonRowSerializationSchema(FIELD_NAMES);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 7efa94e..91fc286 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -37,8 +38,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collections;
 import java.util.concurrent.Future;
 
 
@@ -60,7 +60,8 @@ public class KafkaProducerTest extends TestLogger {
 			
 			// partition setup
 			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-					Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
 
 			// failure when trying to send an element
 			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
@@ -79,7 +80,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (1) producer that propagates errors
 
 			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -102,7 +103,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 068640d..45f70ac 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -41,8 +41,8 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	protected Class<SerializationSchema<Row>> getSerializationSchema() {
-		return (Class) JsonRowSerializationSchema.class;
+	protected SerializationSchema<Row> getSerializationSchema() {
+		return new JsonRowSerializationSchema(FIELD_NAMES);
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 31691d5..18b2aec 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -40,7 +41,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Collections;
-import java.util.Properties;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertNotNull;
@@ -65,7 +65,8 @@ public class KafkaProducerTest extends TestLogger {
 			
 			// partition setup
 			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-					Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
+				// returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+				Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
 
 			// failure when trying to send an element
 			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
@@ -84,7 +85,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (1) producer that propagates errors
 
 			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -105,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), new Properties(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index bede064..33289f8 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -46,8 +45,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -76,7 +78,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 * Array with the partition ids of the given defaultTopicId
 	 * The size of this array is the number of partitions
 	 */
-	protected final int[] partitions;
+	protected int[] partitions;
 
 	/**
 	 * User defined properties for the Producer
@@ -148,30 +150,22 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		this.schema = serializationSchema;
 		this.producerConfig = producerConfig;
 
-		// set the producer configuration properties.
-		if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+		// set the producer configuration properties for kafka record key value serializers.
+		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
 			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
 		} else {
 			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
 		}
 
-		if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+		if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
 			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
 		} else {
 			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
 		}
 
-
-		// create a local KafkaProducer to get the list of partitions.
-		// this will also ensure locally that all required ProducerConfig values are set.
-		try (Producer<Void, IN> getPartitionsProd = getKafkaProducer(this.producerConfig)) {
-			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(defaultTopicId);
-
-			this.partitions = new int[partitionsList.size()];
-			for (int i = 0; i < partitions.length; i++) {
-				partitions[i] = partitionsList.get(i).partition();
-			}
-			getPartitionsProd.close();
+		// eagerly ensure that bootstrap servers are set.
+		if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
 		}
 
 		this.partitioner = customPartitioner;
@@ -218,6 +212,22 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	public void open(Configuration configuration) {
 		producer = getKafkaProducer(this.producerConfig);
 
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+			@Override
+			public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+		});
+
+		partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
 		RuntimeContext ctx = getRuntimeContext();
 		if (partitioner != null) {
 			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
deleted file mode 100644
index 6d92f9b..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Test ensuring that the producer is not dropping buffered records
- */
-@SuppressWarnings("unchecked")
-public class AtLeastOnceProducerTest {
-
-	// we set a timeout because the test will not finish if the logic is broken
-	@Test(timeout=5000)
-	public void testAtLeastOnceProducer() throws Throwable {
-		runTest(true);
-	}
-
-	// This test ensures that the actual test fails if the flushing is disabled
-	@Test(expected = AssertionError.class, timeout=5000)
-	public void ensureTestFails() throws Throwable {
-		runTest(false);
-	}
-
-	private void runTest(boolean flushOnCheckpoint) throws Throwable {
-		Properties props = new Properties();
-		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
-
-		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
-				snapshottingFinished);
-
-		producer.setFlushOnCheckpoint(flushOnCheckpoint);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness =
-				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
-
-		testHarness.open();
-
-		for (int i = 0; i < 100; i++) {
-			testHarness.processElement(new StreamRecord<>("msg-" + i));
-		}
-
-		// start a thread confirming all pending records
-		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
-		final Thread threadA = Thread.currentThread();
-
-		Runnable confirmer = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					MockProducer mp = producer.getProducerInstance();
-					List<Callback> pending = mp.getPending();
-
-					// we need to find out if the snapshot() method blocks forever
-					// this is not possible. If snapshot() is running, it will
-					// start removing elements from the pending list.
-					synchronized (threadA) {
-						threadA.wait(500L);
-					}
-					// we now check that no records have been confirmed yet
-					Assert.assertEquals(100, pending.size());
-					Assert.assertFalse("Snapshot method returned before all records were confirmed",
-							snapshottingFinished.get());
-
-					// now confirm all checkpoints
-					for (Callback c: pending) {
-						c.onCompletion(null, null);
-					}
-					pending.clear();
-				} catch(Throwable t) {
-					runnableError.f0 = t;
-				}
-			}
-		};
-		Thread threadB = new Thread(confirmer);
-		threadB.start();
-
-		// this should block:
-		testHarness.snapshot(0, 0);
-
-		synchronized (threadA) {
-			threadA.notifyAll(); // just in case, to let the test fail faster
-		}
-		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
-		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
-		while (deadline.hasTimeLeft() && threadB.isAlive()) {
-			threadB.join(500);
-		}
-		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
-		if (runnableError.f0 != null) {
-			throw runnableError.f0;
-		}
-
-		testHarness.close();
-	}
-
-
-	private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
-		private static final long serialVersionUID = 1L;
-
-		private transient MockProducer prod;
-		private AtomicBoolean snapshottingFinished;
-
-		public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) {
-			super(defaultTopicId, serializationSchema, producerConfig, null);
-			this.snapshottingFinished = snapshottingFinished;
-		}
-
-		@Override
-		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
-			this.prod = new MockProducer();
-			return this.prod;
-		}
-
-		@Override
-		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
-			// call the actual snapshot state
-			super.snapshotState(ctx);
-			// notify test that snapshotting has been done
-			snapshottingFinished.set(true);
-		}
-
-		@Override
-		protected void flush() {
-			this.prod.flush();
-		}
-
-		public MockProducer getProducerInstance() {
-			return this.prod;
-		}
-	}
-
-	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
-		List<Callback> pendingCallbacks = new ArrayList<>();
-
-		private static Properties getFakeProperties() {
-			Properties p = new Properties();
-			p.setProperty("bootstrap.servers", "localhost:12345");
-			p.setProperty("key.serializer", ByteArraySerializer.class.getName());
-			p.setProperty("value.serializer", ByteArraySerializer.class.getName());
-			return p;
-		}
-		public MockProducer() {
-			super(getFakeProperties());
-		}
-
-		@Override
-		public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
-			throw new UnsupportedOperationException("Unexpected");
-		}
-
-		@Override
-		public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
-			pendingCallbacks.add(callback);
-			return null;
-		}
-
-		@Override
-		public List<PartitionInfo> partitionsFor(String topic) {
-			List<PartitionInfo> list = new ArrayList<>();
-			list.add(new PartitionInfo(topic, 0, null, null, null));
-			return list;
-		}
-
-		@Override
-		public Map<MetricName, ? extends Metric> metrics() {
-			return null;
-		}
-
-
-		public List<Callback> getPending() {
-			return this.pendingCallbacks;
-		}
-
-		public void flush() {
-			while (pendingCallbacks.size() > 0) {
-				try {
-					Thread.sleep(10);
-				} catch (InterruptedException e) {
-					throw new RuntimeException("Unable to flush producer, task was interrupted");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
new file mode 100644
index 0000000..2e06160
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaProducerBaseTest {
+
+	/**
+	 * Tests that the constructor eagerly checks bootstrap servers are set in config
+	 */
+	@Test(expected = IllegalArgumentException.class)
+	public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
+		// no bootstrap servers set in props
+		Properties props = new Properties();
+		// should throw IllegalArgumentException
+		new DummyFlinkKafkaProducer<>(props, null);
+	}
+
+	/**
+	 * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set
+	 */
+	@Test
+	public void testKeyValueDeserializersSetIfMissing() throws Exception {
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
+		// should set missing key value deserializers
+		new DummyFlinkKafkaProducer<>(props, null);
+
+		assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+		assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+		assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+		assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+	}
+
+	/**
+	 * Tests that partitions list is determinate and correctly provided to custom partitioner
+	 */
+	@Test
+	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
+		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
+		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
+		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
+
+		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
+			FakeStandardProducerConfig.get(), mockPartitioner);
+		producer.setRuntimeContext(mockRuntimeContext);
+
+		producer.open(new Configuration());
+
+		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
+		// which should be sorted before provided to the custom partitioner's open() method
+		int[] correctPartitionList = {0, 1, 2, 3};
+		verify(mockPartitioner).open(0, 1, correctPartitionList);
+	}
+
+	/**
+	 * Test ensuring that the producer is not dropping buffered records.;
+	 * we set a timeout because the test will not finish if the logic is broken
+	 */
+	@Test(timeout=5000)
+	public void testAtLeastOnceProducer() throws Throwable {
+		runAtLeastOnceTest(true);
+	}
+
+	/**
+	 * Ensures that the at least once producing test fails if the flushing is disabled
+	 */
+	@Test(expected = AssertionError.class, timeout=5000)
+	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
+		runAtLeastOnceTest(false);
+	}
+
+	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
+		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
+		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+			FakeStandardProducerConfig.get(), null, snapshottingFinished);
+		producer.setFlushOnCheckpoint(flushOnCheckpoint);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
+
+		testHarness.open();
+
+		for (int i = 0; i < 100; i++) {
+			testHarness.processElement(new StreamRecord<>("msg-" + i));
+		}
+
+		// start a thread confirming all pending records
+		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+		final Thread threadA = Thread.currentThread();
+
+		Runnable confirmer = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					MockProducer mp = producer.getProducerInstance();
+					List<Callback> pending = mp.getPending();
+
+					// we need to find out if the snapshot() method blocks forever
+					// this is not possible. If snapshot() is running, it will
+					// start removing elements from the pending list.
+					synchronized (threadA) {
+						threadA.wait(500L);
+					}
+					// we now check that no records have been confirmed yet
+					Assert.assertEquals(100, pending.size());
+					Assert.assertFalse("Snapshot method returned before all records were confirmed",
+						snapshottingFinished.get());
+
+					// now confirm all checkpoints
+					for (Callback c: pending) {
+						c.onCompletion(null, null);
+					}
+					pending.clear();
+				} catch(Throwable t) {
+					runnableError.f0 = t;
+				}
+			}
+		};
+		Thread threadB = new Thread(confirmer);
+		threadB.start();
+
+		// this should block:
+		testHarness.snapshot(0, 0);
+
+		synchronized (threadA) {
+			threadA.notifyAll(); // just in case, to let the test fail faster
+		}
+		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
+		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+		while (deadline.hasTimeLeft() && threadB.isAlive()) {
+			threadB.join(500);
+		}
+		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
+		if (runnableError.f0 != null) {
+			throw runnableError.f0;
+		}
+
+		testHarness.close();
+	}
+
+
+	// ------------------------------------------------------------------------
+
+	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
+		private static final long serialVersionUID = 1L;
+
+		private transient MockProducer prod;
+		private AtomicBoolean snapshottingFinished;
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
+			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+			this.snapshottingFinished = snapshottingFinished;
+		}
+
+		// constructor variant for test irrelated to snapshotting
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+			this.snapshottingFinished = new AtomicBoolean(true);
+		}
+
+		@Override
+		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
+			this.prod = new MockProducer();
+			return this.prod;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+			// call the actual snapshot state
+			super.snapshotState(ctx);
+			// notify test that snapshotting has been done
+			snapshottingFinished.set(true);
+		}
+
+		@Override
+		protected void flush() {
+			this.prod.flush();
+		}
+
+		public MockProducer getProducerInstance() {
+			return this.prod;
+		}
+	}
+
+	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
+		List<Callback> pendingCallbacks = new ArrayList<>();
+
+		public MockProducer() {
+			super(FakeStandardProducerConfig.get());
+		}
+
+		@Override
+		public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+			throw new UnsupportedOperationException("Unexpected");
+		}
+
+		@Override
+		public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+			pendingCallbacks.add(callback);
+			return null;
+		}
+
+		@Override
+		public List<PartitionInfo> partitionsFor(String topic) {
+			List<PartitionInfo> list = new ArrayList<>();
+			// deliberately return an out-of-order partition list
+			list.add(new PartitionInfo(topic, 3, null, null, null));
+			list.add(new PartitionInfo(topic, 1, null, null, null));
+			list.add(new PartitionInfo(topic, 0, null, null, null));
+			list.add(new PartitionInfo(topic, 2, null, null, null));
+			return list;
+		}
+
+		@Override
+		public Map<MetricName, ? extends Metric> metrics() {
+			return null;
+		}
+
+
+		public List<Callback> getPending() {
+			return this.pendingCallbacks;
+		}
+
+		public void flush() {
+			while (pendingCallbacks.size() > 0) {
+				try {
+					Thread.sleep(10);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Unable to flush producer, task was interrupted");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index baddab1..ae0af52 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.junit.Test;
 
@@ -41,18 +42,23 @@ import static org.mockito.Mockito.verify;
 public abstract class KafkaTableSinkTestBase {
 
 	private static final String TOPIC = "testTopic";
-	private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
 	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
-	// we have to mock FlinkKafkaProducerBase as it cannot be instantiated without Kafka
 	@SuppressWarnings("unchecked")
-	private static final FlinkKafkaProducerBase<Row> PRODUCER = mock(FlinkKafkaProducerBase.class);
+	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
+		TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
+
+		@Override
+		protected void flush() {}
+	};
 
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testKafkaTableSink() throws Exception {
 		DataStream dataStream = mock(DataStream.class);
+
 		KafkaTableSink kafkaTableSink = spy(createTableSink());
 		kafkaTableSink.emitDataStream(dataStream);
 
@@ -61,7 +67,7 @@ public abstract class KafkaTableSinkTestBase {
 		verify(kafkaTableSink).createKafkaProducer(
 			eq(TOPIC),
 			eq(PROPERTIES),
-			any(getSerializationSchema()),
+			any(getSerializationSchema().getClass()),
 			eq(PARTITIONER));
 	}
 
@@ -79,7 +85,7 @@ public abstract class KafkaTableSinkTestBase {
 	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
 			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
-	protected abstract Class<SerializationSchema<Row>> getSerializationSchema();
+	protected abstract SerializationSchema<Row> getSerializationSchema();
 
 	private KafkaTableSink createTableSink() {
 		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
new file mode 100644
index 0000000..055326d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+public class FakeStandardProducerConfig {
+
+	public static Properties get() {
+		Properties p = new Properties();
+		p.setProperty("bootstrap.servers", "localhost:12345");
+		p.setProperty("key.serializer", ByteArraySerializer.class.getName());
+		p.setProperty("value.serializer", ByteArraySerializer.class.getName());
+		return p;
+	}
+
+}