You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:57 UTC

[07/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
deleted file mode 100644
index cf39606..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for all fetchers, which implement the connections to Kafka brokers and
- * pull records from Kafka partitions.
- * 
- * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
- * as well as around the optional timestamp assignment and watermark generation. 
- * 
- * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
- *            the Flink data streams.
- * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
- */
-public abstract class AbstractFetcher<T, KPH> {
-	
-	protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
-	protected static final int PERIODIC_WATERMARKS = 1;
-	protected static final int PUNCTUATED_WATERMARKS = 2;
-	
-	// ------------------------------------------------------------------------
-	
-	/** The source context to emit records and watermarks to */
-	protected final SourceContext<T> sourceContext;
-
-	/** The lock that guarantees that record emission and state updates are atomic,
-	 * from the view of taking a checkpoint */
-	protected final Object checkpointLock;
-
-	/** All partitions (and their state) that this fetcher is subscribed to */
-	private final KafkaTopicPartitionState<KPH>[] allPartitions;
-
-	/** The mode describing whether the fetcher also generates timestamps and watermarks */
-	protected final int timestampWatermarkMode;
-
-	/** Flag whether to register metrics for the fetcher */
-	protected final boolean useMetrics;
-
-	/** Only relevant for punctuated watermarks: The current cross partition watermark */
-	private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
-
-	// ------------------------------------------------------------------------
-	
-	protected AbstractFetcher(
-			SourceContext<T> sourceContext,
-			List<KafkaTopicPartition> assignedPartitions,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			ProcessingTimeService processingTimeProvider,
-			long autoWatermarkInterval,
-			ClassLoader userCodeClassLoader,
-			boolean useMetrics) throws Exception
-	{
-		this.sourceContext = checkNotNull(sourceContext);
-		this.checkpointLock = sourceContext.getCheckpointLock();
-		this.useMetrics = useMetrics;
-		
-		// figure out what we watermark mode we will be using
-		
-		if (watermarksPeriodic == null) {
-			if (watermarksPunctuated == null) {
-				// simple case, no watermarks involved
-				timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
-			} else {
-				timestampWatermarkMode = PUNCTUATED_WATERMARKS;
-			}
-		} else {
-			if (watermarksPunctuated == null) {
-				timestampWatermarkMode = PERIODIC_WATERMARKS;
-			} else {
-				throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
-			}
-		}
-		
-		// create our partition state according to the timestamp/watermark mode 
-		this.allPartitions = initializePartitions(
-				assignedPartitions,
-				timestampWatermarkMode,
-				watermarksPeriodic, watermarksPunctuated,
-				userCodeClassLoader);
-		
-		// if we have periodic watermarks, kick off the interval scheduler
-		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
-					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
-			
-			PeriodicWatermarkEmitter periodicEmitter = 
-					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
-			periodicEmitter.start();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets all partitions (with partition state) that this fetcher is subscribed to.
-	 *
-	 * @return All subscribed partitions.
-	 */
-	protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
-		return allPartitions;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Core fetcher work methods
-	// ------------------------------------------------------------------------
-
-	public abstract void runFetchLoop() throws Exception;
-	
-	public abstract void cancel();
-
-	// ------------------------------------------------------------------------
-	//  Kafka version specifics
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Creates the Kafka version specific representation of the given
-	 * topic partition.
-	 * 
-	 * @param partition The Flink representation of the Kafka topic partition.
-	 * @return The specific Kafka representation of the Kafka topic partition.
-	 */
-	public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);
-
-	/**
-	 * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
-	 * older Kafka versions). The given offsets are the internal checkpointed offsets, representing
-	 * the last processed record of each partition. Version-specific implementations of this method
-	 * need to hold the contract that the given offsets must be incremented by 1 before
-	 * committing them, so that committed offsets to Kafka represent "the next record to process".
-	 * 
-	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
-	 * @throws Exception This method forwards exceptions.
-	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
-	
-	// ------------------------------------------------------------------------
-	//  snapshot and restore the state
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Takes a snapshot of the partition offsets.
-	 * 
-	 * <p>Important: This method mus be called under the checkpoint lock.
-	 * 
-	 * @return A map from partition to current offset.
-	 */
-	public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
-		// this method assumes that the checkpoint lock is held
-		assert Thread.holdsLock(checkpointLock);
-
-		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
-		for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
-			state.put(partition.getKafkaTopicPartition(), partition.getOffset());
-		}
-		return state;
-	}
-
-	/**
-	 * Restores the partition offsets.
-	 * 
-	 * @param snapshotState The offsets for the partitions 
-	 */
-	public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
-		for (KafkaTopicPartitionState<?> partition : allPartitions) {
-			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				partition.setOffset(offset);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  emitting records
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Emits a record without attaching an existing timestamp to it.
-	 * 
-	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
-	 * That makes the fast path efficient, the extended paths are called as separate methods.
-	 * 
-	 * @param record The record to emit
-	 * @param partitionState The state of the Kafka partition from which the record was fetched
-	 * @param offset The offset of the record
-	 */
-	protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks
-
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
-			synchronized (checkpointLock) {
-				sourceContext.collect(record);
-				partitionState.setOffset(offset);
-			}
-		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
-		}
-	}
-
-	/**
-	 * Emits a record attaching a timestamp to it.
-	 *
-	 * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
-	 * That makes the fast path efficient, the extended paths are called as separate methods.
-	 *
-	 * @param record The record to emit
-	 * @param partitionState The state of the Kafka partition from which the record was fetched
-	 * @param offset The offset of the record
-	 */
-	protected void emitRecordWithTimestamp(
-			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
-
-		if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-			// fast path logic, in case there are no watermarks generated in the fetcher
-
-			// emit the record, using the checkpoint lock to guarantee
-			// atomicity of record emission and offset state update
-			synchronized (checkpointLock) {
-				sourceContext.collectWithTimestamp(record, timestamp);
-				partitionState.setOffset(offset);
-			}
-		}
-		else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-			emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
-		}
-		else {
-			emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
-		}
-	}
-
-	/**
-	 * Record emission, if a timestamp will be attached from an assigner that is
-	 * also a periodic watermark generator.
-	 */
-	protected void emitRecordWithTimestampAndPeriodicWatermark(
-			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
-	{
-		@SuppressWarnings("unchecked")
-		final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState =
-				(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
-		// extract timestamp - this accesses/modifies the per-partition state inside the
-		// watermark generator instance, so we need to lock the access on the
-		// partition state. concurrent access can happen from the periodic emitter
-		final long timestamp;
-		//noinspection SynchronizationOnLocalVariableOrMethodParameter
-		synchronized (withWatermarksState) {
-			timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-		}
-
-		// emit the record with timestamp, using the usual checkpoint lock to guarantee
-		// atomicity of record emission and offset state update 
-		synchronized (checkpointLock) {
-			sourceContext.collectWithTimestamp(record, timestamp);
-			partitionState.setOffset(offset);
-		}
-	}
-
-	/**
-	 * Record emission, if a timestamp will be attached from an assigner that is
-	 * also a punctuated watermark generator.
-	 */
-	protected void emitRecordWithTimestampAndPunctuatedWatermark(
-			T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp)
-	{
-		@SuppressWarnings("unchecked")
-		final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-				(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
-		// only one thread ever works on accessing timestamps and watermarks
-		// from the punctuated extractor
-		final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-		final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
-		// emit the record with timestamp, using the usual checkpoint lock to guarantee
-		// atomicity of record emission and offset state update 
-		synchronized (checkpointLock) {
-			sourceContext.collectWithTimestamp(record, timestamp);
-			partitionState.setOffset(offset);
-		}
-
-		// if we also have a new per-partition watermark, check if that is also a
-		// new cross-partition watermark
-		if (newWatermark != null) {
-			updateMinPunctuatedWatermark(newWatermark);
-		}
-	}
-
-	/**
-	 *Checks whether a new per-partition watermark is also a new cross-partition watermark.
-	 */
-	private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
-		if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
-			long newMin = Long.MAX_VALUE;
-
-			for (KafkaTopicPartitionState<?> state : allPartitions) {
-				@SuppressWarnings("unchecked")
-				final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-				
-				newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
-			}
-
-			// double-check locking pattern
-			if (newMin > maxWatermarkSoFar) {
-				synchronized (checkpointLock) {
-					if (newMin > maxWatermarkSoFar) {
-						maxWatermarkSoFar = newMin;
-						sourceContext.emitWatermark(new Watermark(newMin));
-					}
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Utility method that takes the topic partitions and creates the topic partition state
-	 * holders. If a watermark generator per partition exists, this will also initialize those.
-	 */
-	private KafkaTopicPartitionState<KPH>[] initializePartitions(
-			List<KafkaTopicPartition> assignedPartitions,
-			int timestampWatermarkMode,
-			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
-			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			ClassLoader userCodeClassLoader)
-		throws IOException, ClassNotFoundException
-	{
-		switch (timestampWatermarkMode) {
-			
-			case NO_TIMESTAMPS_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionState<KPH>[] partitions =
-						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
-				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
-					// create the kafka version specific partition handle
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
-					partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
-				}
-
-				return partitions;
-			}
-
-			case PERIODIC_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
-						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
-
-				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
-
-					AssignerWithPeriodicWatermarks<T> assignerInstance =
-							watermarksPeriodic.deserializeValue(userCodeClassLoader);
-					
-					partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
-							partition, kafkaHandle, assignerInstance);
-				}
-
-				return partitions;
-			}
-
-			case PUNCTUATED_WATERMARKS: {
-				@SuppressWarnings("unchecked")
-				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
-						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
-								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
-
-				int pos = 0;
-				for (KafkaTopicPartition partition : assignedPartitions) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partition);
-
-					AssignerWithPunctuatedWatermarks<T> assignerInstance =
-							watermarksPunctuated.deserializeValue(userCodeClassLoader);
-
-					partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
-							partition, kafkaHandle, assignerInstance);
-				}
-
-				return partitions;
-			}
-			default:
-				// cannot happen, add this as a guard for the future
-				throw new RuntimeException();
-		}
-	}
-
-	// ------------------------- Metrics ----------------------------------
-
-	/**
-	 * Add current and committed offsets to metric group
-	 *
-	 * @param metricGroup The metric group to use
-	 */
-	protected void addOffsetStateGauge(MetricGroup metricGroup) {
-		// add current offsets to gage
-		MetricGroup currentOffsets = metricGroup.addGroup("current-offsets");
-		MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets");
-		for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
-			currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
-			committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
-		}
-	}
-
-	/**
-	 * Gauge types
-	 */
-	private enum OffsetGaugeType {
-		CURRENT_OFFSET,
-		COMMITTED_OFFSET
-	}
-
-	/**
-	 * Gauge for getting the offset of a KafkaTopicPartitionState.
-	 */
-	private static class OffsetGauge implements Gauge<Long> {
-
-		private final KafkaTopicPartitionState<?> ktp;
-		private final OffsetGaugeType gaugeType;
-
-		public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
-			this.ktp = ktp;
-			this.gaugeType = gaugeType;
-		}
-
-		@Override
-		public Long getValue() {
-			switch(gaugeType) {
-				case COMMITTED_OFFSET:
-					return ktp.getCommittedOffset();
-				case CURRENT_OFFSET:
-					return ktp.getOffset();
-				default:
-					throw new RuntimeException("Unknown gauge type: " + gaugeType);
-			}
-		}
-	}
- 	// ------------------------------------------------------------------------
-	
-	/**
-	 * The periodic watermark emitter. In its given interval, it checks all partitions for
-	 * the current event time watermark, and possibly emits the next watermark.
-	 */
-	private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
-
-		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
-		
-		private final SourceContext<?> emitter;
-		
-		private final ProcessingTimeService timerService;
-
-		private final long interval;
-		
-		private long lastWatermarkTimestamp;
-		
-		//-------------------------------------------------
-
-		PeriodicWatermarkEmitter(
-				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
-				SourceContext<?> emitter,
-				ProcessingTimeService timerService,
-				long autoWatermarkInterval)
-		{
-			this.allPartitions = checkNotNull(allPartitions);
-			this.emitter = checkNotNull(emitter);
-			this.timerService = checkNotNull(timerService);
-			this.interval = autoWatermarkInterval;
-			this.lastWatermarkTimestamp = Long.MIN_VALUE;
-		}
-
-		//-------------------------------------------------
-		
-		public void start() {
-			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
-		}
-		
-		@Override
-		public void onProcessingTime(long timestamp) throws Exception {
-
-			long minAcrossAll = Long.MAX_VALUE;
-			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
-				
-				// we access the current watermark for the periodic assigners under the state
-				// lock, to prevent concurrent modification to any internal variables
-				final long curr;
-				//noinspection SynchronizationOnLocalVariableOrMethodParameter
-				synchronized (state) {
-					curr = state.getCurrentWatermarkTimestamp();
-				}
-				
-				minAcrossAll = Math.min(minAcrossAll, curr);
-			}
-			
-			// emit next watermark, if there is one
-			if (minAcrossAll > lastWatermarkTimestamp) {
-				lastWatermarkTimestamp = minAcrossAll;
-				emitter.emitWatermark(new Watermark(minAcrossAll));
-			}
-			
-			// schedule the next watermark
-			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
deleted file mode 100644
index c736493..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A proxy that communicates exceptions between threads. Typically used if an exception
- * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
- * 
- * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
- * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
- * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
- * an exception occurs.
- * 
- * <pre>
- * {@code
- * 
- * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
- * 
- * Thread subThread = new Thread() {
- * 
- *     public void run() {
- *         try {
- *             doSomething();
- *         } catch (Throwable t) {
- *             errorProxy.reportError(
- *         } finally {
- *             doSomeCleanup();
- *         }
- *     }
- * };
- * subThread.start();
- * 
- * doSomethingElse();
- * errorProxy.checkAndThrowException();
- * 
- * doSomethingMore();
- * errorProxy.checkAndThrowException();
- * 
- * try {
- *     subThread.join();
- * } catch (InterruptedException e) {
- *     errorProxy.checkAndThrowException();
- *     // restore interrupted status, if not caused by an exception
- *     Thread.currentThread().interrupt();
- * }
- * }
- * </pre>
- */
-public class ExceptionProxy {
-	
-	/** The thread that should be interrupted when an exception occurs */
-	private final Thread toInterrupt;
-	
-	/** The exception to throw */ 
-	private final AtomicReference<Throwable> exception;
-
-	/**
-	 * Creates an exception proxy that interrupts the given thread upon
-	 * report of an exception. The thread to interrupt may be null.
-	 * 
-	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
-	 */
-	public ExceptionProxy(@Nullable Thread toInterrupt) {
-		this.toInterrupt = toInterrupt;
-		this.exception = new AtomicReference<>();
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Sets the exception and interrupts the target thread,
-	 * if no other exception has occurred so far.
-	 * 
-	 * <p>The exception is only set (and the interruption is only triggered),
-	 * if no other exception was set before.
-	 * 
-	 * @param t The exception that occurred
-	 */
-	public void reportError(Throwable t) {
-		// set the exception, if it is the first (and the exception is non null)
-		if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) {
-			toInterrupt.interrupt();
-		}
-	}
-
-	/**
-	 * Checks whether an exception has been set via {@link #reportError(Throwable)}.
-	 * If yes, that exception if re-thrown by this method.
-	 * 
-	 * @throws Exception This method re-throws the exception, if set.
-	 */
-	public void checkAndThrowException() throws Exception {
-		Throwable t = exception.get();
-		if (t != null) {
-			if (t instanceof Exception) {
-				throw (Exception) t;
-			}
-			else if (t instanceof Error) {
-				throw (Error) t;
-			}
-			else {
-				throw new Exception(t);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
deleted file mode 100644
index c68fe28..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink's description of a partition in a Kafka topic.
- * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
- * 
- * <p>Note: This class must not change in its structure, because it would change the
- * serialization format and make previous savepoints unreadable.
- */
-public final class KafkaTopicPartition implements Serializable {
-
-	/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
-	 * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
-	private static final long serialVersionUID = 722083576322742325L;
-	
-	// ------------------------------------------------------------------------
-
-	private final String topic;
-	private final int partition;
-	private final int cachedHash;
-
-	public KafkaTopicPartition(String topic, int partition) {
-		this.topic = requireNonNull(topic);
-		this.partition = partition;
-		this.cachedHash = 31 * topic.hashCode() + partition;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public String getTopic() {
-		return topic;
-	}
-
-	public int getPartition() {
-		return partition;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "KafkaTopicPartition{" +
-				"topic='" + topic + '\'' +
-				", partition=" + partition +
-				'}';
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o instanceof KafkaTopicPartition) {
-			KafkaTopicPartition that = (KafkaTopicPartition) o;
-			return this.partition == that.partition && this.topic.equals(that.topic);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public static String toString(Map<KafkaTopicPartition, Long> map) {
-		StringBuilder sb = new StringBuilder();
-		for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
-			KafkaTopicPartition ktp = p.getKey();
-			sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
-		}
-		return sb.toString();
-	}
-
-	public static String toString(List<KafkaTopicPartition> partitions) {
-		StringBuilder sb = new StringBuilder();
-		for (KafkaTopicPartition p: partitions) {
-			sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
-		}
-		return sb.toString();
-	}
-
-
-	public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
-		List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
-		for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
-			ret.add(ktpl.getTopicPartition());
-		}
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
deleted file mode 100644
index 1959a05..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.kafka.common.Node;
-
-import java.io.Serializable;
-
-/**
- * Serializable Topic Partition info with leader Node information.
- * This class is used at runtime.
- */
-public class KafkaTopicPartitionLeader implements Serializable {
-
-	private static final long serialVersionUID = 9145855900303748582L;
-
-	private final int leaderId;
-	private final int leaderPort;
-	private final String leaderHost;
-	private final KafkaTopicPartition topicPartition;
-	private final int cachedHash;
-
-	public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
-		this.topicPartition = topicPartition;
-		if (leader == null) {
-			this.leaderId = -1;
-			this.leaderHost = null;
-			this.leaderPort = -1;
-		} else {
-			this.leaderId = leader.id();
-			this.leaderPort = leader.port();
-			this.leaderHost = leader.host();
-		}
-		int cachedHash = (leader == null) ? 14 : leader.hashCode();
-		this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
-	}
-
-	public KafkaTopicPartition getTopicPartition() {
-		return topicPartition;
-	}
-
-	public Node getLeader() {
-		if (this.leaderId == -1) {
-			return null;
-		} else {
-			return new Node(leaderId, leaderHost, leaderPort);
-		}
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (!(o instanceof KafkaTopicPartitionLeader)) {
-			return false;
-		}
-
-		KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
-
-		if (!topicPartition.equals(that.topicPartition)) {
-			return false;
-		}
-		return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-
-	@Override
-	public String toString() {
-		return "KafkaTopicPartitionLeader{" +
-				"leaderId=" + leaderId +
-				", leaderPort=" + leaderPort +
-				", leaderHost='" + leaderHost + '\'' +
-				", topic=" + topicPartition.getTopic() +
-				", partition=" + topicPartition.getPartition() +
-				'}';
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
deleted file mode 100644
index 7cb5f46..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * The state that the Flink Kafka Consumer holds for each Kafka partition.
- * Includes the Kafka descriptor for partitions.
- * 
- * <p>This class describes the most basic state (only the offset), subclasses
- * define more elaborate state, containing current watermarks and timestamp
- * extractors.
- * 
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
- */
-public class KafkaTopicPartitionState<KPH> {
-
-	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
-	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
-	public static final long OFFSET_NOT_SET = -915623761776L;
-	
-	// ------------------------------------------------------------------------
-
-	/** The Flink description of a Kafka partition */
-	private final KafkaTopicPartition partition;
-
-	/** The Kafka description of a Kafka partition (varies across different Kafka versions) */
-	private final KPH kafkaPartitionHandle;
-	
-	/** The offset within the Kafka partition that we already processed */
-	private volatile long offset;
-
-	/** The offset of the Kafka partition that has been committed */
-	private volatile long committedOffset;
-
-	// ------------------------------------------------------------------------
-	
-	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
-		this.partition = partition;
-		this.kafkaPartitionHandle = kafkaPartitionHandle;
-		this.offset = OFFSET_NOT_SET;
-		this.committedOffset = OFFSET_NOT_SET;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets Flink's descriptor for the Kafka Partition.
-	 * @return The Flink partition descriptor.
-	 */
-	public final KafkaTopicPartition getKafkaTopicPartition() {
-		return partition;
-	}
-
-	/**
-	 * Gets Kafka's descriptor for the Kafka Partition.
-	 * @return The Kafka partition descriptor.
-	 */
-	public final KPH getKafkaPartitionHandle() {
-		return kafkaPartitionHandle;
-	}
-
-	public final String getTopic() {
-		return partition.getTopic();
-	}
-
-	public final int getPartition() {
-		return partition.getPartition();
-	}
-
-	/**
-	 * The current offset in the partition. This refers to the offset last element that
-	 * we retrieved and emitted successfully. It is the offset that should be stored in
-	 * a checkpoint.
-	 */
-	public final long getOffset() {
-		return offset;
-	}
-
-	public final void setOffset(long offset) {
-		this.offset = offset;
-	}
-
-	public final boolean isOffsetDefined() {
-		return offset != OFFSET_NOT_SET;
-	}
-
-	public final void setCommittedOffset(long offset) {
-		this.committedOffset = offset;
-	}
-
-	public final long getCommittedOffset() {
-		return committedOffset;
-	}
-
-	
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle
-				+ ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
deleted file mode 100644
index efdc73f..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- * 
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
- */
-public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-	
-	/** The timestamp assigner and watermark generator for the partition */
-	private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
-	
-	/** The last watermark timestamp generated by this partition */
-	private long partitionWatermark;
-
-	// ------------------------------------------------------------------------
-	
-	public KafkaTopicPartitionStateWithPeriodicWatermarks(
-			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-			AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
-	{
-		super(partition, kafkaPartitionHandle);
-		
-		this.timestampsAndWatermarks = timestampsAndWatermarks;
-		this.partitionWatermark = Long.MIN_VALUE;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
-		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
-	}
-	
-	public long getCurrentWatermarkTimestamp() {
-		Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
-		if (wm != null) {
-			partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
-		}
-		return partitionWatermark;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
-				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
deleted file mode 100644
index edf40ce..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import javax.annotation.Nullable;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- * 
- * <p>This class is not thread safe, but it gives volatile access to the current
- * partition watermark ({@link #getCurrentPartitionWatermark()}).
- * 
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
- */
-public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-	
-	/** The timestamp assigner and watermark generator for the partition */
-	private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
-	
-	/** The last watermark timestamp generated by this partition */
-	private volatile long partitionWatermark;
-
-	// ------------------------------------------------------------------------
-	
-	public KafkaTopicPartitionStateWithPunctuatedWatermarks(
-			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-			AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
-	{
-		super(partition, kafkaPartitionHandle);
-		
-		this.timestampsAndWatermarks = timestampsAndWatermarks;
-		this.partitionWatermark = Long.MIN_VALUE;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
-		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
-	}
-
-	@Nullable
-	public Watermark checkAndGetNewWatermark(T record, long timestamp) {
-		Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
-		if (mark != null && mark.getTimestamp() > partitionWatermark) {
-			partitionWatermark = mark.getTimestamp();
-			return mark;
-		}
-		else {
-			return null;
-		}
-	}
-	
-	public long getCurrentPartitionWatermark() {
-		return partitionWatermark;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
-				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
deleted file mode 100644
index 7a41ade..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class TypeUtil {
-	private TypeUtil() {}
-
-	/**
-	 * Creates TypeInformation array for an array of Classes.
-	 * @param fieldTypes classes to extract type information from
-	 * @return type information
-	 */
-	public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
-		TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
-		for (int i = 0; i < fieldTypes.length; i++) {
-			typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
-		}
-		return typeInfos;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
deleted file mode 100644
index cedb696..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-/**
- * Gauge for getting the current value of a Kafka metric.
- */
-public class KafkaMetricWrapper implements Gauge<Double> {
-	private final org.apache.kafka.common.Metric kafkaMetric;
-
-	public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
-		this.kafkaMetric = metric;
-	}
-
-	@Override
-	public Double getValue() {
-		return kafkaMetric.value();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 9b848e0..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * 	# More Flink partitions than kafka partitions
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2	----------------&gt;	2
- * 										3
- * 										4
- * 										5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
- *
- */
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
-	private static final long serialVersionUID = 1627268846962918126L;
-
-	private int targetPartition = -1;
-
-	@Override
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.targetPartition = partitions[parallelInstanceId % partitions.length];
-	}
-
-	@Override
-	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-		if (targetPartition >= 0) {
-			return targetPartition;
-		} else {
-			throw new RuntimeException("The partitioner has not been initialized properly");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 37e2ef6..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
- */
-public abstract class KafkaPartitioner<T> implements Serializable {
-
-	private static final long serialVersionUID = -1974260817778593473L;
-
-	/**
-	 * Initializer for the Partitioner.
-	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
-	 * @param parallelInstances the total number of parallel instances
-	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
-	 */
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		// overwrite this method if needed.
-	}
-
-	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
deleted file mode 100644
index d170058..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.io.IOException;
-
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
- */
-public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
-	private ObjectMapper mapper;
-
-	@Override
-	public ObjectNode deserialize(byte[] message) throws IOException {
-		if (mapper == null) {
-			mapper = new ObjectMapper();
-		}
-		return mapper.readValue(message, ObjectNode.class);
-	}
-
-	@Override
-	public boolean isEndOfStream(ObjectNode nextElement) {
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
deleted file mode 100644
index 261a111..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Key fields can be accessed by calling objectNode.get("key").get(&lt;name>).as(&lt;type>)
- * <p>
- * Value fields can be accessed by calling objectNode.get("value").get(&lt;name>).as(&lt;type>)
- * <p>
- * Metadata fields can be accessed by calling objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
- * the "offset" (long), "topic" (String) and "partition" (int).
- */
-public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
-	private final boolean includeMetadata;
-	private ObjectMapper mapper;
-
-	public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
-		this.includeMetadata = includeMetadata;
-	}
-
-	@Override
-	public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-		if (mapper == null) {
-			mapper = new ObjectMapper();
-		}
-		ObjectNode node = mapper.createObjectNode();
-		node.set("key", mapper.readValue(messageKey, JsonNode.class));
-		node.set("value", mapper.readValue(message, JsonNode.class));
-		if (includeMetadata) {
-			node.putObject("metadata")
-				.put("offset", offset)
-				.put("topic", topic)
-				.put("partition", partition);
-		}
-		return node;
-	}
-
-	@Override
-	public boolean isEndOfStream(ObjectNode nextElement) {
-		return false;
-	}
-
-	@Override
-	public TypeInformation<ObjectNode> getProducedType() {
-		return getForClass(ObjectNode.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
deleted file mode 100644
index 4344810..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- */
-public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
-
-	/** Field names to parse. Indices match fieldTypes indices. */
-	private final String[] fieldNames;
-
-	/** Types to parse fields as. Indices match fieldNames indices. */
-	private final TypeInformation<?>[] fieldTypes;
-
-	/** Object mapper for parsing the JSON. */
-	private final ObjectMapper objectMapper = new ObjectMapper();
-
-	/** Flag indicating whether to fail on a missing field. */
-	private boolean failOnMissingField;
-
-	/**
-	 * Creates a JSON deserialization schema for the given fields and type classes.
-	 *
-	 * @param fieldNames Names of JSON fields to parse.
-	 * @param fieldTypes Type classes to parse JSON fields as.
-	 */
-	public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) {
-		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
-
-		this.fieldTypes = new TypeInformation[fieldTypes.length];
-		for (int i = 0; i < fieldTypes.length; i++) {
-			this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]);
-		}
-
-		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
-				"Number of provided field names and types does not match.");
-	}
-
-	/**
-	 * Creates a JSON deserialization schema for the given fields and types.
-	 *
-	 * @param fieldNames Names of JSON fields to parse.
-	 * @param fieldTypes Types to parse JSON fields as.
-	 */
-	public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-		this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
-		this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
-
-		Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
-				"Number of provided field names and types does not match.");
-	}
-
-	@Override
-	public Row deserialize(byte[] message) throws IOException {
-		try {
-			JsonNode root = objectMapper.readTree(message);
-
-			Row row = new Row(fieldNames.length);
-			for (int i = 0; i < fieldNames.length; i++) {
-				JsonNode node = root.get(fieldNames[i]);
-
-				if (node == null) {
-					if (failOnMissingField) {
-						throw new IllegalStateException("Failed to find field with name '"
-								+ fieldNames[i] + "'.");
-					} else {
-						row.setField(i, null);
-					}
-				} else {
-					// Read the value as specified type
-					Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
-					row.setField(i, value);
-				}
-			}
-
-			return row;
-		} catch (Throwable t) {
-			throw new IOException("Failed to deserialize JSON object.", t);
-		}
-	}
-
-	@Override
-	public boolean isEndOfStream(Row nextElement) {
-		return false;
-	}
-
-	@Override
-	public TypeInformation<Row> getProducedType() {
-		return new RowTypeInfo(fieldTypes);
-	}
-
-	/**
-	 * Configures the failure behaviour if a JSON field is missing.
-	 *
-	 * <p>By default, a missing field is ignored and the field is set to null.
-	 *
-	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
-	 */
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		this.failOnMissingField = failOnMissingField;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
deleted file mode 100644
index 077ff13..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.util.Preconditions;
-
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- */
-public class JsonRowSerializationSchema implements SerializationSchema<Row> {
-	/** Fields names in the input Row object */
-	private final String[] fieldNames;
-	/** Object mapper that is used to create output JSON objects */
-	private static ObjectMapper mapper = new ObjectMapper();
-
-	/**
-	 * Creates a JSON serialization schema for the given fields and types.
-	 *
-	 * @param fieldNames Names of JSON fields to parse.
-	 */
-	public JsonRowSerializationSchema(String[] fieldNames) {
-		this.fieldNames = Preconditions.checkNotNull(fieldNames);
-	}
-
-	@Override
-	public byte[] serialize(Row row) {
-		if (row.productArity() != fieldNames.length) {
-			throw new IllegalStateException(String.format(
-				"Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
-		}
-
-		ObjectNode objectNode = mapper.createObjectNode();
-
-		for (int i = 0; i < row.productArity(); i++) {
-			JsonNode node = mapper.valueToTree(row.productElement(i));
-			objectNode.set(fieldNames[i], node);
-		}
-
-		try {
-			return mapper.writeValueAsBytes(objectNode);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to serialize row", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
deleted file mode 100644
index 01e72ca..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the byte key / value messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
- * processed by Flink.
- * 
- * @param <T> The type created by the keyed deserialization schema.
- */
-public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
-
-	/**
-	 * Deserializes the byte message.
-	 *
-	 * @param messageKey the key as a byte array (null if no key has been set)
-	 * @param message The message, as a byte array. (null if the message was empty or deleted)
-	 * @param partition The partition the message has originated from
-	 * @param offset the offset of the message in the original source (for example the Kafka offset)  @return The deserialized message as an object.
-	 */
-	T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
-
-	/**
-	 * Method to decide whether the element signals the end of the stream. If
-	 * true is returned the element won't be emitted.
-	 * 
-	 * @param nextElement The element to test for the end-of-stream signal.
-	 * @return True, if the element signals end of stream, false otherwise.
-	 */
-	boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
deleted file mode 100644
index 4b9dba2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-/**
- * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
- * interface
- * @param <T> The type created by the deserialization schema.
- */
-public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
-
-	private static final long serialVersionUID = 2651665280744549932L;
-
-	private final DeserializationSchema<T> deserializationSchema;
-
-	public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
-		this.deserializationSchema = deserializationSchema;
-	}
-	@Override
-	public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-		return deserializationSchema.deserialize(message);
-	}
-
-	@Override
-	public boolean isEndOfStream(T nextElement) {
-		return deserializationSchema.isEndOfStream(nextElement);
-	}
-
-	@Override
-	public TypeInformation<T> getProducedType() {
-		return deserializationSchema.getProducedType();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
deleted file mode 100644
index 701281e..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data to be handed
- * to them in a specific format (for example as byte strings).
- * 
- * @param <T> The type to be serialized.
- */
-public interface KeyedSerializationSchema<T> extends Serializable {
-
-	/**
-	 * Serializes the key of the incoming element to a byte array
-	 * This method might return null if no key is available.
-	 *
-	 * @param element The incoming element to be serialized
-	 * @return the key of the element as a byte array
-	 */
-	byte[] serializeKey(T element);
-
-
-	/**
-	 * Serializes the value of the incoming element to a byte array
-	 * 
-	 * @param element The incoming element to be serialized
-	 * @return the value of the element as a byte array
-	 */
-	byte[] serializeValue(T element);
-
-	/**
-	 * Optional method to determine the target topic for the element
-	 *
-	 * @param element Incoming element to determine the target topic from
-	 * @return null or the target topic
-	 */
-	String getTargetTopic(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
deleted file mode 100644
index 1b3e486..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-/**
- * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
- * interface
- * @param <T> The type to serialize
- */
-public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
-
-	private static final long serialVersionUID = 1351665280744549933L;
-
-	private final SerializationSchema<T> serializationSchema;
-
-	public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
-		this.serializationSchema = serializationSchema;
-	}
-
-	@Override
-	public byte[] serializeKey(T element) {
-		return null;
-	}
-
-	@Override
-	public byte[] serializeValue(T element) {
-		return serializationSchema.serialize(element);
-	}
-
-	@Override
-	public String getTargetTopic(T element) {
-		return null; // we are never overriding the topic
-	}
-}