You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:58 UTC

[08/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
deleted file mode 100644
index 25040eb..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. 
- */
-public class HandoverTest {
-
-	// ------------------------------------------------------------------------
-	//  test produce / consumer
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testWithVariableProducer() throws Exception {
-		runProducerConsumerTest(500, 2, 0);
-	}
-
-	@Test
-	public void testWithVariableConsumer() throws Exception {
-		runProducerConsumerTest(500, 0, 2);
-	}
-
-	@Test
-	public void testWithVariableBoth() throws Exception {
-		runProducerConsumerTest(500, 2, 2);
-	}
-
-	// ------------------------------------------------------------------------
-	//  test error propagation
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testPublishErrorOnEmptyHandover() throws Exception {
-		final Handover handover = new Handover();
-
-		Exception error = new Exception();
-		handover.reportError(error);
-
-		try {
-			handover.pollNext();
-			fail("should throw an exception");
-		}
-		catch (Exception e) {
-			assertEquals(error, e);
-		}
-	}
-
-	@Test
-	public void testPublishErrorOnFullHandover() throws Exception {
-		final Handover handover = new Handover();
-		handover.produce(createTestRecords());
-
-		IOException error = new IOException();
-		handover.reportError(error);
-
-		try {
-			handover.pollNext();
-			fail("should throw an exception");
-		}
-		catch (Exception e) {
-			assertEquals(error, e);
-		}
-	}
-
-	@Test
-	public void testExceptionMarksClosedOnEmpty() throws Exception {
-		final Handover handover = new Handover();
-
-		IllegalStateException error = new IllegalStateException();
-		handover.reportError(error);
-
-		try {
-			handover.produce(createTestRecords());
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testExceptionMarksClosedOnFull() throws Exception {
-		final Handover handover = new Handover();
-		handover.produce(createTestRecords());
-
-		LinkageError error = new LinkageError();
-		handover.reportError(error);
-
-		try {
-			handover.produce(createTestRecords());
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test closing behavior
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testCloseEmptyForConsumer() throws Exception {
-		final Handover handover = new Handover();
-		handover.close();
-
-		try {
-			handover.pollNext();
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testCloseFullForConsumer() throws Exception {
-		final Handover handover = new Handover();
-		handover.produce(createTestRecords());
-		handover.close();
-
-		try {
-			handover.pollNext();
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testCloseEmptyForProducer() throws Exception {
-		final Handover handover = new Handover();
-		handover.close();
-
-		try {
-			handover.produce(createTestRecords());
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	@Test
-	public void testCloseFullForProducer() throws Exception {
-		final Handover handover = new Handover();
-		handover.produce(createTestRecords());
-		handover.close();
-
-		try {
-			handover.produce(createTestRecords());
-			fail("should throw an exception");
-		}
-		catch (Handover.ClosedException e) {
-			// expected
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test wake up behavior
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
-		Handover handover = new Handover();
-		handover.wakeupProducer();
-
-		// produce into a woken but empty handover
-		try {
-			handover.produce(createTestRecords());
-		}
-		catch (Handover.WakeupException e) {
-			fail();
-		}
-
-		// handover now has records, next time we wakeup and produce it needs
-		// to throw an exception
-		handover.wakeupProducer();
-		try {
-			handover.produce(createTestRecords());
-			fail("should throw an exception");
-		}
-		catch (Handover.WakeupException e) {
-			// expected
-		}
-
-		// empty the handover
-		assertNotNull(handover.pollNext());
-		
-		// producing into an empty handover should work
-		try {
-			handover.produce(createTestRecords());
-		}
-		catch (Handover.WakeupException e) {
-			fail();
-		}
-	}
-
-	@Test
-	public void testWakeupWakesOnlyOnce() throws Exception {
-		// create a full handover
-		final Handover handover = new Handover();
-		handover.produce(createTestRecords());
-
-		handover.wakeupProducer();
-
-		try {
-			handover.produce(createTestRecords());
-			fail();
-		} catch (WakeupException e) {
-			// expected
-		}
-
-		CheckedThread producer = new CheckedThread() {
-			@Override
-			public void go() throws Exception {
-				handover.produce(createTestRecords());
-			}
-		};
-		producer.start();
-
-		// the producer must go blocking
-		producer.waitUntilThreadHoldsLock(10000);
-
-		// release the thread by consuming something
-		assertNotNull(handover.pollNext());
-		producer.sync();
-	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
-		// generate test data
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
-		for (int i = 0; i < numRecords; i++) {
-			data[i] = createTestRecords();
-		}
-
-		final Handover handover = new Handover();
-
-		ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
-		ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
-
-		consumer.start();
-		producer.start();
-
-		// sync first on the consumer, so it propagates assertion errors
-		consumer.sync();
-		producer.sync();
-	}
-
-	@SuppressWarnings("unchecked")
-	private static ConsumerRecords<byte[], byte[]> createTestRecords() {
-		return mock(ConsumerRecords.class);
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static abstract class CheckedThread extends Thread {
-
-		private volatile Throwable error;
-
-		public abstract void go() throws Exception;
-
-		@Override
-		public void run() {
-			try {
-				go();
-			}
-			catch (Throwable t) {
-				error = t;
-			}
-		}
-
-		public void sync() throws Exception {
-			join();
-			if (error != null) {
-				ExceptionUtils.rethrowException(error, error.getMessage());
-			}
-		}
-
-		public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
-			final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
-			
-			while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
-				Thread.sleep(1);
-			}
-
-			if (!isBlockedOrWaiting()) {
-				throw new TimeoutException();
-			}
-		}
-
-		private boolean isBlockedOrWaiting() {
-			State state = getState();
-			return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
-		}
-	}
-
-	private static class ProducerThread extends CheckedThread {
-
-		private final Random rnd = new Random();
-		private final Handover handover;
-		private final ConsumerRecords<byte[], byte[]>[] data;
-		private final int maxDelay;
-
-		private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
-			this.handover = handover;
-			this.data = data;
-			this.maxDelay = maxDelay;
-		}
-
-		@Override
-		public void go() throws Exception {
-			for (ConsumerRecords<byte[], byte[]> rec : data) {
-				handover.produce(rec);
-
-				if (maxDelay > 0) {
-					int delay = rnd.nextInt(maxDelay);
-					Thread.sleep(delay);
-				}
-			}
-		}
-	}
-
-	private static class ConsumerThread extends CheckedThread {
-
-		private final Random rnd = new Random();
-		private final Handover handover;
-		private final ConsumerRecords<byte[], byte[]>[] data;
-		private final int maxDelay;
-
-		private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
-			this.handover = handover;
-			this.data = data;
-			this.maxDelay = maxDelay;
-		}
-
-		@Override
-		public void go() throws Exception {
-			for (ConsumerRecords<byte[], byte[]> rec : data) {
-				ConsumerRecords<byte[], byte[]> next = handover.pollNext();
-
-				assertEquals(rec, next);
-
-				if (maxDelay > 0) {
-					int delay = rnd.nextInt(maxDelay);
-					Thread.sleep(delay);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4ac1773..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger
-
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
deleted file mode 100644
index ef71bde..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ /dev/null
@@ -1,212 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-kafka-base_2.10</artifactId>
-	<name>flink-connector-kafka-base</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<kafka.version>0.8.2.2</kafka.version>
-	</properties>
-
-	<dependencies>
-
-		<!-- core dependencies -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
-			<optional>true</optional>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_${scala.binary.version}</artifactId>
-			<version>${kafka.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>net.sf.jopt-simple</groupId>
-					<artifactId>jopt-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-reflect</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.yammer.metrics</groupId>
-					<artifactId>metrics-annotation</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<!-- test dependencies -->
-		
-		<!-- force using the latest zkclient -->
-		<dependency>
-			<groupId>com.101tec</groupId>
-			<artifactId>zkclient</artifactId>
-			<version>0.7</version>
-			<type>jar</type>
-			<scope>test</scope>
-		</dependency>
-
-
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-metrics-jmx</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_2.10</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-minikdc</artifactId>
-			<version>${minikdc.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-	</dependencies>
-
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>com.101tec</groupId>
-				<artifactId>zkclient</artifactId>
-				<version>0.7</version>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-	
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<!--
-            https://issues.apache.org/jira/browse/DIRSHARED-134
-            Required to pull the Mini-KDC transitive dependency
-            -->
-			<plugin>
-				<groupId>org.apache.felix</groupId>
-				<artifactId>maven-bundle-plugin</artifactId>
-				<version>3.0.1</version>
-				<inherited>true</inherited>
-				<extensions>true</extensions>
-			</plugin>
-		</plugins>
-	</build>
-	
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
deleted file mode 100644
index aef7116..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class of all Flink Kafka Consumer data sources.
- * This implements the common behavior across all Kafka versions.
- * 
- * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
- * {@link AbstractFetcher}.
- * 
- * @param <T> The type of records produced by this data source
- */
-public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
-		CheckpointListener,
-		ResultTypeQueryable<T>,
-		CheckpointedFunction {
-	private static final long serialVersionUID = -6272159445203409112L;
-
-	protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
-	
-	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
-	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
-	/** Boolean configuration key to disable metrics tracking **/
-	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
-	// ------------------------------------------------------------------------
-	//  configuration state, set on the client relevant for all subtasks
-	// ------------------------------------------------------------------------
-
-	private final List<String> topics;
-	
-	/** The schema to convert between Kafka's byte messages, and Flink's objects */
-	protected final KeyedDeserializationSchema<T> deserializer;
-
-	/** The set of topic partitions that the source will read */
-	protected List<KafkaTopicPartition> subscribedPartitions;
-	
-	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
-	 * to exploit per-partition timestamp characteristics.
-	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
-	private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
-	
-	/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
-	 * to exploit per-partition timestamp characteristics. 
-	 * The assigner is kept in serialized form, to deserialize it into multiple copies */
-	private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
-
-	private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
-
-	// ------------------------------------------------------------------------
-	//  runtime state (used individually by each parallel subtask) 
-	// ------------------------------------------------------------------------
-	
-	/** Data for pending but uncommitted offsets */
-	private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
-	/** The fetcher implements the connections to the Kafka brokers */
-	private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
-	
-	/** The offsets to restore to, if the consumer restores state from a checkpoint */
-	private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
-	
-	/** Flag indicating whether the consumer is still running **/
-	private volatile boolean running = true;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Base constructor.
-	 *
-	 * @param deserializer
-	 *           The deserializer to turn raw byte messages into Java/Scala objects.
-	 */
-	public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
-		this.topics = checkNotNull(topics);
-		checkArgument(topics.size() > 0, "You have to define at least one topic.");
-		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
-	}
-
-	/**
-	 * This method must be called from the subclasses, to set the list of all subscribed partitions
-	 * that this consumer will fetch from (across all subtasks).
-	 * 
-	 * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
-	 */
-	protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
-		checkNotNull(allSubscribedPartitions);
-		this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Configuration
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
-	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
-	 * in the same way as in the Flink runtime, when streams are merged.
-	 * 
-	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
-	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
-	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
-	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
-	 * parallel source subtask reads more that one partition.
-	 * 
-	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
-	 * partition, allows users to let them exploit the per-partition characteristics.
-	 * 
-	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
-	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
-	 * 
-	 * @param assigner The timestamp assigner / watermark generator to use.
-	 * @return The consumer object, to allow function chaining.   
-	 */
-	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
-		checkNotNull(assigner);
-		
-		if (this.periodicWatermarkAssigner != null) {
-			throw new IllegalStateException("A periodic watermark emitter has already been set.");
-		}
-		try {
-			ClosureCleaner.clean(assigner, true);
-			this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
-			return this;
-		} catch (Exception e) {
-			throw new IllegalArgumentException("The given assigner is not serializable", e);
-		}
-	}
-
-	/**
-	 * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
-	 * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
-	 * in the same way as in the Flink runtime, when streams are merged.
-	 *
-	 * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
-	 * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
-	 * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
-	 * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
-	 * parallel source subtask reads more that one partition.
-	 *
-	 * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
-	 * partition, allows users to let them exploit the per-partition characteristics.
-	 *
-	 * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
-	 * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
-	 *
-	 * @param assigner The timestamp assigner / watermark generator to use.
-	 * @return The consumer object, to allow function chaining.   
-	 */
-	public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
-		checkNotNull(assigner);
-		
-		if (this.punctuatedWatermarkAssigner != null) {
-			throw new IllegalStateException("A punctuated watermark emitter has already been set.");
-		}
-		try {
-			ClosureCleaner.clean(assigner, true);
-			this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
-			return this;
-		} catch (Exception e) {
-			throw new IllegalArgumentException("The given assigner is not serializable", e);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Work methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void run(SourceContext<T> sourceContext) throws Exception {
-		if (subscribedPartitions == null) {
-			throw new Exception("The partitions were not set for the consumer");
-		}
-
-		// we need only do work, if we actually have partitions assigned
-		if (!subscribedPartitions.isEmpty()) {
-
-			// (1) create the fetcher that will communicate with the Kafka brokers
-			final AbstractFetcher<T, ?> fetcher = createFetcher(
-					sourceContext, subscribedPartitions,
-					periodicWatermarkAssigner, punctuatedWatermarkAssigner,
-					(StreamingRuntimeContext) getRuntimeContext());
-
-			// (2) set the fetcher to the restored checkpoint offsets
-			if (restoreToOffset != null) {
-				fetcher.restoreOffsets(restoreToOffset);
-			}
-
-			// publish the reference, for snapshot-, commit-, and cancel calls
-			// IMPORTANT: We can only do that now, because only now will calls to
-			//            the fetchers 'snapshotCurrentState()' method return at least
-			//            the restored offsets
-			this.kafkaFetcher = fetcher;
-			if (!running) {
-				return;
-			}
-			
-			// (3) run the fetcher' main work method
-			fetcher.runFetchLoop();
-		}
-		else {
-			// this source never completes, so emit a Long.MAX_VALUE watermark
-			// to not block watermark forwarding
-			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-			// wait until this is canceled
-			final Object waitLock = new Object();
-			while (running) {
-				try {
-					//noinspection SynchronizationOnLocalVariableOrMethodParameter
-					synchronized (waitLock) {
-						waitLock.wait();
-					}
-				}
-				catch (InterruptedException e) {
-					if (!running) {
-						// restore the interrupted state, and fall through the loop
-						Thread.currentThread().interrupt();
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		// set ourselves as not running
-		running = false;
-		
-		// abort the fetcher, if there is one
-		if (kafkaFetcher != null) {
-			kafkaFetcher.cancel();
-		}
-
-		// there will be an interrupt() call to the main thread anyways
-	}
-
-	@Override
-	public void open(Configuration configuration) {
-		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
-
-		if (kafkaTopicPartitions != null) {
-			assignTopicPartitions(kafkaTopicPartitions);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		// pretty much the same logic as cancelling
-		try {
-			cancel();
-		} finally {
-			super.close();
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Checkpoint and restore
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-
-		OperatorStateStore stateStore = context.getOperatorStateStore();
-		offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
-		if (context.isRestored()) {
-			restoreToOffset = new HashMap<>();
-			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
-				restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
-			}
-
-			LOG.info("Setting restore state in the FlinkKafkaConsumer.");
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Using the following offsets: {}", restoreToOffset);
-			}
-		} else {
-			LOG.info("No restore state for FlinkKafkaConsumer.");
-		}
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		if (!running) {
-			LOG.debug("snapshotState() called on closed source");
-		} else {
-
-			offsetsStateForCheckpoint.clear();
-
-			final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
-			if (fetcher == null) {
-				// the fetcher has not yet been initialized, which means we need to return the
-				// originally restored offsets or the assigned partitions
-
-				if (restoreToOffset != null) {
-
-					for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
-						offsetsStateForCheckpoint.add(
-								Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
-					}
-				} else if (subscribedPartitions != null) {
-					for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
-						offsetsStateForCheckpoint.add(
-								Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
-					}
-				}
-
-				// the map cannot be asynchronously updated, because only one checkpoint call can happen
-				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
-			} else {
-				HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
-
-				// the map cannot be asynchronously updated, because only one checkpoint call can happen
-				// on this function at a time: either snapshotState() or notifyCheckpointComplete()
-				pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
-
-				for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
-					offsetsStateForCheckpoint.add(
-							Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
-				}
-			}
-
-			// truncate the map of pending offsets to commit, to prevent infinite growth
-			while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
-				pendingOffsetsToCommit.remove(0);
-			}
-		}
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		if (!running) {
-			LOG.debug("notifyCheckpointComplete() called on closed source");
-			return;
-		}
-
-		final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
-		if (fetcher == null) {
-			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
-			return;
-		}
-		
-		// only one commit operation must be in progress
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
-		}
-
-		try {
-			final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
-			if (posInMap == -1) {
-				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-				return;
-			}
-
-			@SuppressWarnings("unchecked")
-			HashMap<KafkaTopicPartition, Long> offsets =
-					(HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
-
-			// remove older checkpoints in map
-			for (int i = 0; i < posInMap; i++) {
-				pendingOffsetsToCommit.remove(0);
-			}
-
-			if (offsets == null || offsets.size() == 0) {
-				LOG.debug("Checkpoint state was empty.");
-				return;
-			}
-			fetcher.commitInternalOffsetsToKafka(offsets);
-		}
-		catch (Exception e) {
-			if (running) {
-				throw e;
-			}
-			// else ignore exception if we are no longer running
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Kafka Consumer specific methods
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
-	 * data, and emits it into the data streams.
-	 * 
-	 * @param sourceContext The source context to emit data to.
-	 * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
-	 * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
-	 * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
-	 * @param runtimeContext The task's runtime context.
-	 * 
-	 * @return The instantiated fetcher
-	 * 
-	 * @throws Exception The method should forward exceptions
-	 */
-	protected abstract AbstractFetcher<T, ?> createFetcher(
-			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> thisSubtaskPartitions,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext) throws Exception;
-
-	protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
-	
-	// ------------------------------------------------------------------------
-	//  ResultTypeQueryable methods 
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<T> getProducedType() {
-		return deserializer.getProducedType();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
-		subscribedPartitions = new ArrayList<>();
-
-		if (restoreToOffset != null) {
-			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-				if (restoreToOffset.containsKey(kafkaTopicPartition)) {
-					subscribedPartitions.add(kafkaTopicPartition);
-				}
-			}
-		} else {
-			Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
-				@Override
-				public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
-					int topicComparison = o1.getTopic().compareTo(o2.getTopic());
-
-					if (topicComparison == 0) {
-						return o1.getPartition() - o2.getPartition();
-					} else {
-						return topicComparison;
-					}
-				}
-			});
-
-			for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
-				subscribedPartitions.add(kafkaTopicPartitions.get(i));
-			}
-		}
-	}
-
-	/**
-	 * Selects which of the given partitions should be handled by a specific consumer,
-	 * given a certain number of consumers.
-	 * 
-	 * @param allPartitions The partitions to select from
-	 * @param numConsumers The number of consumers
-	 * @param consumerIndex The index of the specific consumer
-	 * 
-	 * @return The sublist of partitions to be handled by that consumer.
-	 */
-	protected static List<KafkaTopicPartition> assignPartitions(
-			List<KafkaTopicPartition> allPartitions,
-			int numConsumers, int consumerIndex) {
-		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
-				allPartitions.size() / numConsumers + 1);
-
-		for (int i = 0; i < allPartitions.size(); i++) {
-			if (i % numConsumers == consumerIndex) {
-				thisSubtaskPartitions.add(allPartitions.get(i));
-			}
-		}
-		
-		return thisSubtaskPartitions;
-	}
-	
-	/**
-	 * Logs the partition information in INFO level.
-	 * 
-	 * @param logger The logger to log to.
-	 * @param partitionInfos List of subscribed partitions
-	 */
-	protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
-		Map<String, Integer> countPerTopic = new HashMap<>();
-		for (KafkaTopicPartition partition : partitionInfos) {
-			Integer count = countPerTopic.get(partition.getTopic());
-			if (count == null) {
-				count = 1;
-			} else {
-				count++;
-			}
-			countPerTopic.put(partition.getTopic(), count);
-		}
-		StringBuilder sb = new StringBuilder(
-				"Consumer is going to read the following topics (with number of partitions): ");
-		
-		for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
-			sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
-		}
-		
-		logger.info(sb.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
deleted file mode 100644
index d413f1c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
-import static java.util.Objects.requireNonNull;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer provides at-least-once reliability guarantees when
- * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
- * Otherwise, the producer doesn't provide any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Configuration key for disabling the metrics reporting
-	 */
-	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
-	/**
-	 * Array with the partition ids of the given defaultTopicId
-	 * The size of this array is the number of partitions
-	 */
-	protected int[] partitions;
-
-	/**
-	 * User defined properties for the Producer
-	 */
-	protected final Properties producerConfig;
-
-	/**
-	 * The name of the default topic this producer is writing data to
-	 */
-	protected final String defaultTopicId;
-
-	/**
-	 * (Serializable) SerializationSchema for turning objects used with Flink into
-	 * byte[] for Kafka.
-	 */
-	protected final KeyedSerializationSchema<IN> schema;
-
-	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition.
-	 */
-	protected final KafkaPartitioner<IN> partitioner;
-
-	/**
-	 * Flag indicating whether to accept failures (and log them), or to fail on failures
-	 */
-	protected boolean logFailuresOnly;
-
-	/**
-	 * If true, the producer will wait until all outstanding records have been send to the broker.
-	 */
-	protected boolean flushOnCheckpoint;
-	
-	// -------------------------------- Runtime fields ------------------------------------------
-
-	/** KafkaProducer instance */
-	protected transient KafkaProducer<byte[], byte[]> producer;
-
-	/** The callback than handles error propagation or logging callbacks */
-	protected transient Callback callback;
-
-	/** Errors encountered in the async producer are stored here */
-	protected transient volatile Exception asyncException;
-
-	/** Lock for accessing the pending records */
-	protected final SerializableObject pendingRecordsLock = new SerializableObject();
-
-	/** Number of unacknowledged records. */
-	protected long pendingRecords;
-
-	protected OperatorStateStore stateStore;
-
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
-	 *
-	 * @param defaultTopicId The default topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
-	 */
-	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		requireNonNull(defaultTopicId, "TopicID not set");
-		requireNonNull(serializationSchema, "serializationSchema not set");
-		requireNonNull(producerConfig, "producerConfig not set");
-		ClosureCleaner.clean(customPartitioner, true);
-		ClosureCleaner.ensureSerializable(serializationSchema);
-
-		this.defaultTopicId = defaultTopicId;
-		this.schema = serializationSchema;
-		this.producerConfig = producerConfig;
-
-		// set the producer configuration properties for kafka record key value serializers.
-		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-		}
-
-		if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
-		} else {
-			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-		}
-
-		// eagerly ensure that bootstrap servers are set.
-		if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
-		}
-
-		this.partitioner = customPartitioner;
-	}
-
-	// ---------------------------------- Properties --------------------------
-
-	/**
-	 * Defines whether the producer should fail on errors, or only log them.
-	 * If this is set to true, then exceptions will be only logged, if set to false,
-	 * exceptions will be eventually thrown and cause the streaming program to 
-	 * fail (and enter recovery).
-	 * 
-	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
-	 */
-	public void setLogFailuresOnly(boolean logFailuresOnly) {
-		this.logFailuresOnly = logFailuresOnly;
-	}
-
-	/**
-	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
-	 * to be acknowledged by the Kafka producer on a checkpoint.
-	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
-	 *
-	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
-	 */
-	public void setFlushOnCheckpoint(boolean flush) {
-		this.flushOnCheckpoint = flush;
-	}
-
-	/**
-	 * Used for testing only
-	 */
-	protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
-		return new KafkaProducer<>(props);
-	}
-
-	// ----------------------------------- Utilities --------------------------
-	
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-		producer = getKafkaProducer(this.producerConfig);
-
-		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-			@Override
-			public int compare(PartitionInfo o1, PartitionInfo o2) {
-				return Integer.compare(o1.partition(), o2.partition());
-			}
-		});
-
-		partitions = new int[partitionsList.size()];
-		for (int i = 0; i < partitions.length; i++) {
-			partitions[i] = partitionsList.get(i).partition();
-		}
-
-		RuntimeContext ctx = getRuntimeContext();
-		if (partitioner != null) {
-			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
-		}
-
-		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
-				ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
-
-		// register Kafka metrics to Flink accumulators
-		if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
-			Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
-
-			if (metrics == null) {
-				// MapR's Kafka implementation returns null here.
-				LOG.info("Producer implementation does not support metrics");
-			} else {
-				final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
-				for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
-					kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
-				}
-			}
-		}
-
-		if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
-			LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
-			flushOnCheckpoint = false;
-		}
-
-		if (logFailuresOnly) {
-			callback = new Callback() {
-				@Override
-				public void onCompletion(RecordMetadata metadata, Exception e) {
-					if (e != null) {
-						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
-					}
-					acknowledgeMessage();
-				}
-			};
-		}
-		else {
-			callback = new Callback() {
-				@Override
-				public void onCompletion(RecordMetadata metadata, Exception exception) {
-					if (exception != null && asyncException == null) {
-						asyncException = exception;
-					}
-					acknowledgeMessage();
-				}
-			};
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) throws Exception {
-		// propagate asynchronous errors
-		checkErroneous();
-
-		byte[] serializedKey = schema.serializeKey(next);
-		byte[] serializedValue = schema.serializeValue(next);
-		String targetTopic = schema.getTargetTopic(next);
-		if (targetTopic == null) {
-			targetTopic = defaultTopicId;
-		}
-
-		ProducerRecord<byte[], byte[]> record;
-		if (partitioner == null) {
-			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
-		} else {
-			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
-		}
-		if (flushOnCheckpoint) {
-			synchronized (pendingRecordsLock) {
-				pendingRecords++;
-			}
-		}
-		producer.send(record, callback);
-	}
-
-
-	@Override
-	public void close() throws Exception {
-		if (producer != null) {
-			producer.close();
-		}
-		
-		// make sure we propagate pending errors
-		checkErroneous();
-	}
-
-	// ------------------- Logic for handling checkpoint flushing -------------------------- //
-
-	private void acknowledgeMessage() {
-		if (flushOnCheckpoint) {
-			synchronized (pendingRecordsLock) {
-				pendingRecords--;
-				if (pendingRecords == 0) {
-					pendingRecordsLock.notifyAll();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Flush pending records.
-	 */
-	protected abstract void flush();
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		this.stateStore = context.getOperatorStateStore();
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
-		if (flushOnCheckpoint) {
-			// flushing is activated: We need to wait until pendingRecords is 0
-			flush();
-			synchronized (pendingRecordsLock) {
-				if (pendingRecords != 0) {
-					throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
-				}
-				// pending records count is 0. We can now confirm the checkpoint
-			}
-		}
-	}
-
-	// ----------------------------------- Utilities --------------------------
-
-	protected void checkErroneous() throws Exception {
-		Exception e = asyncException;
-		if (e != null) {
-			// prevent double throwing
-			asyncException = null;
-			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
-		}
-	}
-	
-	public static Properties getPropertiesFromBrokerList(String brokerList) {
-		String[] elements = brokerList.split(",");
-		
-		// validate the broker addresses
-		for (String broker: elements) {
-			NetUtils.getCorrectHostnamePort(broker);
-		}
-		
-		Properties props = new Properties();
-		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-		return props;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
deleted file mode 100644
index ee98783..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Base class for {@link KafkaTableSink} that serializes data in JSON format
- */
-public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
-	/**
-	 * Creates KafkaJsonTableSink
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 */
-	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	@Override
-	protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
-		return new JsonRowSerializationSchema(fieldNames);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
deleted file mode 100644
index f145509..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka JSON {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
- *
- * <p>The field names are used to parse the JSON file and so are the types.
- */
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
-
-	/**
-	 * Creates a generic Kafka JSON {@link StreamTableSource}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param fieldNames Row field names.
-	 * @param fieldTypes Row field types.
-	 */
-	KafkaJsonTableSource(
-			String topic,
-			Properties properties,
-			String[] fieldNames,
-			Class<?>[] fieldTypes) {
-
-		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
-	}
-
-	/**
-	 * Creates a generic Kafka JSON {@link StreamTableSource}.
-	 *
-	 * @param topic      Kafka topic to consume.
-	 * @param properties Properties for the Kafka consumer.
-	 * @param fieldNames Row field names.
-	 * @param fieldTypes Row field types.
-	 */
-	KafkaJsonTableSource(
-			String topic,
-			Properties properties,
-			String[] fieldNames,
-			TypeInformation<?>[] fieldTypes) {
-
-		super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
-	}
-
-	/**
-	 * Configures the failure behaviour if a JSON field is missing.
-	 *
-	 * <p>By default, a missing field is ignored and the field is set to null.
-	 *
-	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
-	 */
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
-		deserializationSchema.setFailOnMissingField(failOnMissingField);
-	}
-
-	private static JsonRowDeserializationSchema createDeserializationSchema(
-			String[] fieldNames,
-			TypeInformation<?>[] fieldTypes) {
-
-		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-	}
-
-	private static JsonRowDeserializationSchema createDeserializationSchema(
-			String[] fieldNames,
-			Class<?>[] fieldTypes) {
-
-		return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
deleted file mode 100644
index 714d9cd..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSink}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
- */
-public abstract class KafkaTableSink implements StreamTableSink<Row> {
-
-	protected final String topic;
-	protected final Properties properties;
-	protected SerializationSchema<Row> serializationSchema;
-	protected final KafkaPartitioner<Row> partitioner;
-	protected String[] fieldNames;
-	protected TypeInformation[] fieldTypes;
-
-	/**
-	 * Creates KafkaTableSink
-	 *
-	 * @param topic                 Kafka topic to write to.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param partitioner           Partitioner to select Kafka partition for each item
-	 */
-	public KafkaTableSink(
-			String topic,
-			Properties properties,
-			KafkaPartitioner<Row> partitioner) {
-
-		this.topic = Preconditions.checkNotNull(topic, "topic");
-		this.properties = Preconditions.checkNotNull(properties, "properties");
-		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
-	}
-
-	/**
-	 * Returns the version-specifid Kafka producer.
-	 *
-	 * @param topic               Kafka topic to produce to.
-	 * @param properties          Properties for the Kafka producer.
-	 * @param serializationSchema Serialization schema to use to create Kafka records.
-	 * @param partitioner         Partitioner to select Kafka partition.
-	 * @return The version-specific Kafka producer
-	 */
-	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-		String topic, Properties properties,
-		SerializationSchema<Row> serializationSchema,
-		KafkaPartitioner<Row> partitioner);
-
-	/**
-	 * Create serialization schema for converting table rows into bytes.
-	 *
-	 * @param fieldNames Field names in table rows.
-	 * @return Instance of serialization schema
-	 */
-	protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
-
-	/**
-	 * Create a deep copy of this sink.
-	 *
-	 * @return Deep copy of this sink
-	 */
-	protected abstract KafkaTableSink createCopy();
-
-	@Override
-	public void emitDataStream(DataStream<Row> dataStream) {
-		FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
-		dataStream.addSink(kafkaProducer);
-	}
-
-	@Override
-	public TypeInformation<Row> getOutputType() {
-		return new RowTypeInfo(getFieldTypes());
-	}
-
-	public String[] getFieldNames() {
-		return fieldNames;
-	}
-
-	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
-	}
-
-	@Override
-	public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-		KafkaTableSink copy = createCopy();
-		copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
-		copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
-		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
-			"Number of provided field names and types does not match.");
-		copy.serializationSchema = createSerializationSchema(fieldNames);
-
-		return copy;
-	}
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
deleted file mode 100644
index fd423d7..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
- */
-public abstract class KafkaTableSource implements StreamTableSource<Row> {
-
-	/** The Kafka topic to consume. */
-	private final String topic;
-
-	/** Properties for the Kafka consumer. */
-	private final Properties properties;
-
-	/** Deserialization schema to use for Kafka records. */
-	private final DeserializationSchema<Row> deserializationSchema;
-
-	/** Row field names. */
-	private final String[] fieldNames;
-
-	/** Row field types. */
-	private final TypeInformation<?>[] fieldTypes;
-
-	/**
-	 * Creates a generic Kafka {@link StreamTableSource}.
-	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param fieldNames            Row field names.
-	 * @param fieldTypes            Row field types.
-	 */
-	KafkaTableSource(
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			String[] fieldNames,
-			Class<?>[] fieldTypes) {
-
-		this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
-	}
-
-	/**
-	 * Creates a generic Kafka {@link StreamTableSource}.
-	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @param fieldNames            Row field names.
-	 * @param fieldTypes            Row field types.
-	 */
-	KafkaTableSource(
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema,
-			String[] fieldNames,
-			TypeInformation<?>[] fieldTypes) {
-
-		this.topic = Preconditions.checkNotNull(topic, "Topic");
-		this.properties = Preconditions.checkNotNull(properties, "Properties");
-		this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
-		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
-
-		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
-				"Number of provided field names and types does not match.");
-	}
-
-	/**
-	 * NOTE: This method is for internal use only for defining a TableSource.
-	 *       Do not use it in Table API programs.
-	 */
-	@Override
-	public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
-		// Version-specific Kafka consumer
-		FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
-		DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
-		return kafkaSource;
-	}
-
-	@Override
-	public int getNumberOfFields() {
-		return fieldNames.length;
-	}
-
-	@Override
-	public String[] getFieldsNames() {
-		return fieldNames;
-	}
-
-	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
-	}
-
-	@Override
-	public TypeInformation<Row> getReturnType() {
-		return new RowTypeInfo(fieldTypes);
-	}
-
-	/**
-	 * Returns the version-specific Kafka consumer.
-	 *
-	 * @param topic                 Kafka topic to consume.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param deserializationSchema Deserialization schema to use for Kafka records.
-	 * @return The version-specific Kafka consumer
-	 */
-	abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
-			String topic,
-			Properties properties,
-			DeserializationSchema<Row> deserializationSchema);
-
-	/**
-	 * Returns the deserialization schema.
-	 *
-	 * @return The deserialization schema
-	 */
-	protected DeserializationSchema<Row> getDeserializationSchema() {
-		return deserializationSchema;
-	}
-}