You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/10 09:25:12 UTC

flink git commit: [FLINK-4745] [table] Convert KafkaTableSource test to unit tests

Repository: flink
Updated Branches:
  refs/heads/master b949d42d9 -> 5e30ba384


[FLINK-4745] [table] Convert KafkaTableSource test to unit tests

This closes #2603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e30ba38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e30ba38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e30ba38

Branch: refs/heads/master
Commit: 5e30ba384934b08861a970744db64b5123cfeff8
Parents: b949d42
Author: twalthr <tw...@apache.org>
Authored: Wed Oct 5 16:55:20 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Oct 10 11:22:19 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka08ITCase.java         | 65 -------------
 .../kafka/Kafka08JsonTableSinkTest.java         |  2 +-
 .../kafka/Kafka08JsonTableSourceTest.java       | 45 +++++++++
 .../connectors/kafka/Kafka09ITCase.java         | 79 ----------------
 .../kafka/Kafka09JsonTableSinkTest.java         |  1 +
 .../kafka/Kafka09JsonTableSourceTest.java       | 45 +++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java | 98 --------------------
 .../kafka/KafkaTableSinkTestBase.java           | 38 +++-----
 .../kafka/KafkaTableSourceTestBase.java         | 77 +++++++++++++++
 9 files changed, 184 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 467ccc5..1c69d78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,7 +31,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
@@ -368,66 +365,4 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		curatorFramework.close();
 	}
-
-	@Test
-	public void testJsonTableSource() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
-				topic,
-				standardProps,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(false);
-
-		runJsonTableSource(topic, tableSource);
-	}
-
-	@Test
-	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
-				topic,
-				standardProps,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(true);
-
-		try {
-			runJsonTableSource(topic, tableSource);
-			fail("Did not throw expected Exception");
-		} catch (Exception e) {
-			Throwable rootCause = e.getCause().getCause().getCause();
-			assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index b1e6db9..446e1d7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -28,7 +28,7 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override
 	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
-				final FlinkKafkaProducerBase<Row> kafkaProducer) {
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
 		return new Kafka08JsonTableSink(topic, properties, partitioner) {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+		return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer08.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 16ddcdc..fd167a0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -17,16 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.junit.Test;
 
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class Kafka09ITCase extends KafkaConsumerTestBase {
 
 	// ------------------------------------------------------------------------
@@ -126,75 +118,4 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	public void testMetrics() throws Throwable {
 		runMetricsTest();
 	}
-
-	@Test
-	public void testJsonTableSource() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
-				topic,
-				props,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(false);
-
-		runJsonTableSource(topic, tableSource);
-	}
-
-	@Test
-	public void testJsonTableSourceWithFailOnMissingField() throws Exception {
-		String topic = UUID.randomUUID().toString();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-
-		// Names and types are determined in the actual test method of the
-		// base test class.
-		Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
-				topic,
-				props,
-				new String[] {
-						"long",
-						"string",
-						"boolean",
-						"double",
-						"missing-field"},
-				new TypeInformation<?>[] {
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.BOOLEAN_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO });
-
-		// Don't fail on missing field, but set to null (default)
-		tableSource.setFailOnMissingField(true);
-
-		try {
-			runJsonTableSource(topic, tableSource);
-			fail("Did not throw expected Exception");
-		} catch (Exception e) {
-			Throwable rootCause = e.getCause().getCause().getCause();
-			assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index bfdcf68..068640d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -29,6 +29,7 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 	@Override
 	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
 		return new Kafka09JsonTableSink(topic, properties, partitioner) {
 			@Override
 			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+		return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer09.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 9c36b43..bafff4f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -787,104 +787,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Runs a table source test with JSON data.
-	 *
-	 * The table source needs to parse the following JSON fields:
-	 * - "long" -> number
-	 * - "string" -> "string"
-	 * - "boolean" -> true|false
-	 * - "double" -> fraction
-	 */
-	public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
-		final ObjectMapper mapper = new ObjectMapper();
-
-		final int numElements = 1024;
-		final long[] longs = new long[numElements];
-		final String[] strings = new String[numElements];
-		final boolean[] booleans = new boolean[numElements];
-		final double[] doubles = new double[numElements];
-
-		final byte[][] serializedJson = new byte[numElements][];
-
-		ThreadLocalRandom random = ThreadLocalRandom.current();
-
-		for (int i = 0; i < numElements; i++) {
-			longs[i] = random.nextLong();
-			strings[i] = Integer.toHexString(random.nextInt());
-			booleans[i] = random.nextBoolean();
-			doubles[i] = random.nextDouble();
-
-			ObjectNode entry = mapper.createObjectNode();
-			entry.put("long", longs[i]);
-			entry.put("string", strings[i]);
-			entry.put("boolean", booleans[i]);
-			entry.put("double", doubles[i]);
-
-			serializedJson[i] = mapper.writeValueAsBytes(entry);
-		}
-
-		// Produce serialized JSON data
-		createTestTopic(topic, 1, 1);
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-
-		env.addSource(new SourceFunction<byte[]>() {
-			@Override
-			public void run(SourceContext<byte[]> ctx) throws Exception {
-				for (int i = 0; i < numElements; i++) {
-					ctx.collect(serializedJson[i]);
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).addSink(kafkaServer.getProducer(
-				topic,
-				new ByteArraySerializationSchema(),
-				props,
-				null));
-
-		// Execute blocks
-		env.execute();
-
-		// Register as table source
-		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
-		tableEnvironment.registerTableSource("kafka", kafkaTableSource);
-
-		Table result = tableEnvironment.ingest("kafka");
-
-		tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
-
-			int i = 0;
-
-			@Override
-			public void invoke(Row value) throws Exception {
-				assertEquals(5, value.productArity());
-				assertEquals(longs[i], value.productElement(0));
-				assertEquals(strings[i], value.productElement(1));
-				assertEquals(booleans[i], value.productElement(2));
-				assertEquals(doubles[i], value.productElement(3));
-				assertNull(value.productElement(4));
-
-				if (i == numElements-1) {
-					throw new SuccessException();
-				} else {
-					i++;
-				}
-			}
-		});
-
-		tryExecutePropagateExceptions(env, "KafkaTableSource");
-	}
-
-	/**
 	 * Serialization scheme forwarding byte[] records.
 	 */
 	private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e46ca08..baddab1 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -38,39 +38,31 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase implements Serializable {
-
-	private final static String TOPIC = "testTopic";
-	private final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
-	private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
-
-	private final KafkaPartitioner<Row> partitioner = new CustomPartitioner();
-	private final Properties properties = createSinkProperties();
+public abstract class KafkaTableSinkTestBase {
+
+	private static final String TOPIC = "testTopic";
+	private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+	private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+	private static final Properties PROPERTIES = createSinkProperties();
+	// we have to mock FlinkKafkaProducerBase as it cannot be instantiated without Kafka
 	@SuppressWarnings("unchecked")
-	private final FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
+	private static final FlinkKafkaProducerBase<Row> PRODUCER = mock(FlinkKafkaProducerBase.class);
 
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testKafkaTableSink() throws Exception {
 		DataStream dataStream = mock(DataStream.class);
-		KafkaTableSink kafkaTableSink = createTableSink();
-		kafkaTableSink.emitDataStream(dataStream);
-
-		verify(dataStream).addSink(kafkaProducer);
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCreatedProducer() throws Exception {
-		DataStream dataStream = mock(DataStream.class);
 		KafkaTableSink kafkaTableSink = spy(createTableSink());
 		kafkaTableSink.emitDataStream(dataStream);
 
+		verify(dataStream).addSink(eq(PRODUCER));
+
 		verify(kafkaTableSink).createKafkaProducer(
 			eq(TOPIC),
-			eq(properties),
+			eq(PROPERTIES),
 			any(getSerializationSchema()),
-			eq(partitioner));
+			eq(PARTITIONER));
 	}
 
 	@Test
@@ -90,12 +82,12 @@ public abstract class KafkaTableSinkTestBase implements Serializable {
 	protected abstract Class<SerializationSchema<Row>> getSerializationSchema();
 
 	private KafkaTableSink createTableSink() {
-		return createTableSink(TOPIC, properties, partitioner, kafkaProducer);
+		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
 	}
 
 	private static Properties createSinkProperties() {
 		Properties properties = new Properties();
-		properties.setProperty("testKey", "testValue");
+		properties.setProperty("bootstrap.servers", "localhost:12345");
 		return properties;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
new file mode 100644
index 0000000..2a281e8
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSourceTestBase {
+
+	private static final String TOPIC = "testTopic";
+	private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" };
+	private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
+		BasicTypeInfo.LONG_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.BOOLEAN_TYPE_INFO,
+		BasicTypeInfo.DOUBLE_TYPE_INFO,
+		BasicTypeInfo.LONG_TYPE_INFO };
+	private static final Properties PROPERTIES = createSourceProperties();
+
+	@Test
+	public void testKafkaTableSource() {
+		KafkaTableSource kafkaTableSource = spy(createTableSource());
+		StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
+		kafkaTableSource.getDataStream(env);
+
+		verify(env).addSource(any(getFlinkKafkaConsumer()));
+
+		verify(kafkaTableSource).getKafkaConsumer(
+			eq(TOPIC),
+			eq(PROPERTIES),
+			any(getDeserializationSchema()));
+	}
+
+	protected abstract KafkaTableSource createTableSource(String topic, Properties properties,
+			String[] fieldNames, TypeInformation<?>[] typeInfo);
+
+	protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
+
+	protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
+
+	private KafkaTableSource createTableSource() {
+		return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES);
+	}
+
+	private static Properties createSourceProperties() {
+		Properties properties = new Properties();
+		properties.setProperty("zookeeper.connect", "dummy");
+		properties.setProperty("group.id", "dummy");
+		return properties;
+	}
+}