You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:56 UTC

[06/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
deleted file mode 100644
index 51bc8d1..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
- * transform typed from and to byte arrays.
- * 
- * @param <K> The key type to be serialized.
- * @param <V> The value type to be serialized.
- */
-public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
-
-	private static final long serialVersionUID = -5359448468131559102L;
-
-	/** The serializer for the key */
-	private final TypeSerializer<K> keySerializer;
-
-	/** The serializer for the value */
-	private final TypeSerializer<V> valueSerializer;
-
-	/** reusable input deserialization buffer */
-	private final DataInputDeserializer inputDeserializer;
-	
-	/** reusable output serialization buffer for the key */
-	private transient DataOutputSerializer keyOutputSerializer;
-
-	/** reusable output serialization buffer for the value */
-	private transient DataOutputSerializer valueOutputSerializer;
-	
-	
-	/** The type information, to be returned by {@link #getProducedType()}. It is
-	 * transient, because it is not serializable. Note that this means that the type information
-	 * is not available at runtime, but only prior to the first serialization / deserialization */
-	private final transient TypeInformation<Tuple2<K, V>> typeInfo;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new de-/serialization schema for the given types.
-	 *
-	 * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
-	 * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
-	 * @param ec The execution config, which is used to parametrize the type serializers.
-	 */
-	public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
-		this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
-		this.keySerializer = keyTypeInfo.createSerializer(ec);
-		this.valueSerializer = valueTypeInfo.createSerializer(ec);
-		this.inputDeserializer = new DataInputDeserializer();
-	}
-
-	/**
-	 * Creates a new de-/serialization schema for the given types. This constructor accepts the types
-	 * as classes and internally constructs the type information from the classes.
-	 * 
-	 * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
-	 * that accepts {@link TypeInformation} instead.
-	 * 
-	 * @param keyClass The class of the key de-/serialized by this schema.
-	 * @param valueClass The class of the value de-/serialized by this schema.
-	 * @param config The execution config, which is used to parametrize the type serializers.
-	 */
-	public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
-		this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config);
-	}
-
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-		K key = null;
-		V value = null;
-		
-		if (messageKey != null) {
-			inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
-			key = keySerializer.deserialize(inputDeserializer);
-		}
-		if (message != null) {
-			inputDeserializer.setBuffer(message, 0, message.length);
-			value = valueSerializer.deserialize(inputDeserializer);
-		}
-		return new Tuple2<>(key, value);
-	}
-
-	/**
-	 * This schema never considers an element to signal end-of-stream, so this method returns always false.
-	 * @param nextElement The element to test for the end-of-stream signal.
-	 * @return Returns false.
-	 */
-	@Override
-	public boolean isEndOfStream(Tuple2<K,V> nextElement) {
-		return false;
-	}
-
-
-	@Override
-	public byte[] serializeKey(Tuple2<K, V> element) {
-		if (element.f0 == null) {
-			return null;
-		} else {
-			// key is not null. serialize it:
-			if (keyOutputSerializer == null) {
-				keyOutputSerializer = new DataOutputSerializer(16);
-			}
-			try {
-				keySerializer.serialize(element.f0, keyOutputSerializer);
-			}
-			catch (IOException e) {
-				throw new RuntimeException("Unable to serialize record", e);
-			}
-			// check if key byte array size changed
-			byte[] res = keyOutputSerializer.getByteArray();
-			if (res.length != keyOutputSerializer.length()) {
-				byte[] n = new byte[keyOutputSerializer.length()];
-				System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
-				res = n;
-			}
-			keyOutputSerializer.clear();
-			return res;
-		}
-	}
-
-	@Override
-	public byte[] serializeValue(Tuple2<K, V> element) {
-		// if the value is null, its serialized value is null as well.
-		if (element.f1 == null) {
-			return null;
-		}
-
-		if (valueOutputSerializer == null) {
-			valueOutputSerializer = new DataOutputSerializer(16);
-		}
-
-		try {
-			valueSerializer.serialize(element.f1, valueOutputSerializer);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Unable to serialize record", e);
-		}
-
-		byte[] res = valueOutputSerializer.getByteArray();
-		if (res.length != valueOutputSerializer.length()) {
-			byte[] n = new byte[valueOutputSerializer.length()];
-			System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
-			res = n;
-		}
-		valueOutputSerializer.clear();
-		return res;
-	}
-
-	@Override
-	public String getTargetTopic(Tuple2<K, V> element) {
-		return null; // we are never overriding the topic
-	}
-
-
-	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		if (typeInfo != null) {
-			return typeInfo;
-		}
-		else {
-			throw new IllegalStateException(
-					"The type information is not available after this class has been serialized and distributed.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
deleted file mode 100644
index b96ba30..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaConsumerBaseTest {
-
-	/**
-	 * Tests that not both types of timestamp extractors / watermark generators can be used.
-	 */
-	@Test
-	public void testEitherWatermarkExtractor() {
-		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
-			fail();
-		} catch (NullPointerException ignored) {}
-
-		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
-			fail();
-		} catch (NullPointerException ignored) {}
-		
-		@SuppressWarnings("unchecked")
-		final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
-		@SuppressWarnings("unchecked")
-		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
-		
-		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
-		c1.assignTimestampsAndWatermarks(periodicAssigner);
-		try {
-			c1.assignTimestampsAndWatermarks(punctuatedAssigner);
-			fail();
-		} catch (IllegalStateException ignored) {}
-
-		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
-		c2.assignTimestampsAndWatermarks(punctuatedAssigner);
-		try {
-			c2.assignTimestampsAndWatermarks(periodicAssigner);
-			fail();
-		} catch (IllegalStateException ignored) {}
-	}
-
-	/**
-	 * Tests that no checkpoints happen when the fetcher is not running.
-	 */
-	@Test
-	public void ignoreCheckpointWhenNotRunning() throws Exception {
-		@SuppressWarnings("unchecked")
-		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
-
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
-		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
-
-		assertFalse(listState.get().iterator().hasNext());
-		consumer.notifyCheckpointComplete(66L);
-	}
-
-	/**
-	 * Tests that no checkpoints happen when the fetcher is not running.
-	 */
-	@Test
-	public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-
-		TestingListState<Serializable> listState = new TestingListState<>();
-		listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
-		listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
-
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-		when(initializationContext.isRestored()).thenReturn(true);
-
-		consumer.initializeState(initializationContext);
-
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
-
-		// ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
-		// to figure out that snapshotState() actually did something.
-		Assert.assertTrue(listState.isClearCalled());
-
-		Set<Serializable> expected = new HashSet<>();
-
-		for (Serializable serializable : listState.get()) {
-			expected.add(serializable);
-		}
-
-		int counter = 0;
-
-		for (Serializable serializable : listState.get()) {
-			assertTrue(expected.contains(serializable));
-			counter++;
-		}
-
-		assertEquals(expected.size(), counter);
-	}
-
-	/**
-	 * Tests that no checkpoints happen when the fetcher is not running.
-	 */
-	@Test
-	public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		TestingListState<Serializable> listState = new TestingListState<>();
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-		when(initializationContext.isRestored()).thenReturn(false);
-
-		consumer.initializeState(initializationContext);
-
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
-
-		assertFalse(listState.get().iterator().hasNext());
-	}
-
-	/**
-	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
-	 */
-	@Test
-	public void checkUseFetcherWhenNoCheckpoint() throws Exception {
-
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-		List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
-		partitionList.add(new KafkaTopicPartition("test", 0));
-		consumer.setSubscribedPartitions(partitionList);
-
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		TestingListState<Serializable> listState = new TestingListState<>();
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-
-		// make the context signal that there is no restored state, then validate that
-		when(initializationContext.isRestored()).thenReturn(false);
-		consumer.initializeState(initializationContext);
-		consumer.run(mock(SourceFunction.SourceContext.class));
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testSnapshotState() throws Exception {
-
-		// --------------------------------------------------------------------
-		//   prepare fake states
-		// --------------------------------------------------------------------
-
-		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
-		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
-		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
-
-		final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
-		state2.put(new KafkaTopicPartition("abc", 13), 16770L);
-		state2.put(new KafkaTopicPartition("def", 7), 987654329L);
-
-		final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
-		state3.put(new KafkaTopicPartition("abc", 13), 16780L);
-		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
-
-		// --------------------------------------------------------------------
-		
-		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
-		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-			
-		final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-	
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
-		assertEquals(0, pendingOffsetsToCommit.size());
-
-		OperatorStateStore backend = mock(OperatorStateStore.class);
-
-		TestingListState<Serializable> listState = new TestingListState<>();
-
-		when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-		when(initializationContext.isRestored()).thenReturn(false, true, true, true);
-
-		consumer.initializeState(initializationContext);
-
-		// checkpoint 1
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
-
-		HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
-
-		for (Serializable serializable : listState.get()) {
-			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
-			snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
-		}
-
-		assertEquals(state1, snapshot1);
-		assertEquals(1, pendingOffsetsToCommit.size());
-		assertEquals(state1, pendingOffsetsToCommit.get(138L));
-
-		// checkpoint 2
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
-
-		HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
-
-		for (Serializable serializable : listState.get()) {
-			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
-			snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
-		}
-
-		assertEquals(state2, snapshot2);
-		assertEquals(2, pendingOffsetsToCommit.size());
-		assertEquals(state2, pendingOffsetsToCommit.get(140L));
-		
-		// ack checkpoint 1
-		consumer.notifyCheckpointComplete(138L);
-		assertEquals(1, pendingOffsetsToCommit.size());
-		assertTrue(pendingOffsetsToCommit.containsKey(140L));
-
-		// checkpoint 3
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
-
-		HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
-
-		for (Serializable serializable : listState.get()) {
-			Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
-			snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
-		}
-
-		assertEquals(state3, snapshot3);
-		assertEquals(2, pendingOffsetsToCommit.size());
-		assertEquals(state3, pendingOffsetsToCommit.get(141L));
-		
-		// ack checkpoint 3, subsumes number 2
-		consumer.notifyCheckpointComplete(141L);
-		assertEquals(0, pendingOffsetsToCommit.size());
-
-
-		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		assertEquals(0, pendingOffsetsToCommit.size());
-
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		listState = new TestingListState<>();
-		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
-		// create 500 snapshots
-		for (int i = 100; i < 600; i++) {
-			consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
-			listState.clear();
-		}
-		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size());
-
-		// commit only the second last
-		consumer.notifyCheckpointComplete(598);
-		assertEquals(1, pendingOffsetsToCommit.size());
-
-		// access invalid checkpoint
-		consumer.notifyCheckpointComplete(590);
-
-		// and the last
-		consumer.notifyCheckpointComplete(599);
-		assertEquals(0, pendingOffsetsToCommit.size());
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
-			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
-	{
-		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
-
-		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
-		fetcherField.setAccessible(true);
-		fetcherField.set(consumer, fetcher);
-
-		Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
-		mapField.setAccessible(true);
-		mapField.set(consumer, pendingOffsetsToCommit);
-
-		Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
-		runningField.setAccessible(true);
-		runningField.set(consumer, running);
-
-		return consumer;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("unchecked")
-		public DummyFlinkKafkaConsumer() {
-			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
-		}
-
-		@Override
-		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
-			AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					Assert.fail("Trying to restore offsets even though there was no restore state.");
-					return null;
-				}
-			}).when(fetcher).restoreOffsets(any(HashMap.class));
-			return fetcher;
-		}
-
-		@Override
-		protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
-			return Collections.emptyList();
-		}
-
-		@Override
-		public RuntimeContext getRuntimeContext() {
-			return mock(StreamingRuntimeContext.class);
-		}
-	}
-
-	private static final class TestingListState<T> implements ListState<T> {
-
-		private final List<T> list = new ArrayList<>();
-		private boolean clearCalled = false;
-
-		@Override
-		public void clear() {
-			list.clear();
-			clearCalled = true;
-		}
-
-		@Override
-		public Iterable<T> get() throws Exception {
-			return list;
-		}
-
-		@Override
-		public void add(T value) throws Exception {
-			list.add(value);
-		}
-
-		public List<T> getList() {
-			return list;
-		}
-
-		public boolean isClearCalled() {
-			return clearCalled;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
deleted file mode 100644
index 2e06160..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaProducerBaseTest {
-
-	/**
-	 * Tests that the constructor eagerly checks bootstrap servers are set in config
-	 */
-	@Test(expected = IllegalArgumentException.class)
-	public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
-		// no bootstrap servers set in props
-		Properties props = new Properties();
-		// should throw IllegalArgumentException
-		new DummyFlinkKafkaProducer<>(props, null);
-	}
-
-	/**
-	 * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set
-	 */
-	@Test
-	public void testKeyValueDeserializersSetIfMissing() throws Exception {
-		Properties props = new Properties();
-		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
-		// should set missing key value deserializers
-		new DummyFlinkKafkaProducer<>(props, null);
-
-		assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-		assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-		assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
-		assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
-	}
-
-	/**
-	 * Tests that partitions list is determinate and correctly provided to custom partitioner
-	 */
-	@Test
-	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
-		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
-		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
-		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
-		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
-		DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
-			FakeStandardProducerConfig.get(), mockPartitioner);
-		producer.setRuntimeContext(mockRuntimeContext);
-
-		producer.open(new Configuration());
-
-		// the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
-		// which should be sorted before provided to the custom partitioner's open() method
-		int[] correctPartitionList = {0, 1, 2, 3};
-		verify(mockPartitioner).open(0, 1, correctPartitionList);
-	}
-
-	/**
-	 * Test ensuring that the producer is not dropping buffered records.;
-	 * we set a timeout because the test will not finish if the logic is broken
-	 */
-	@Test(timeout=5000)
-	public void testAtLeastOnceProducer() throws Throwable {
-		runAtLeastOnceTest(true);
-	}
-
-	/**
-	 * Ensures that the at least once producing test fails if the flushing is disabled
-	 */
-	@Test(expected = AssertionError.class, timeout=5000)
-	public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
-		runAtLeastOnceTest(false);
-	}
-
-	private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
-		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
-		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null, snapshottingFinished);
-		producer.setFlushOnCheckpoint(flushOnCheckpoint);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness =
-				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
-
-		testHarness.open();
-
-		for (int i = 0; i < 100; i++) {
-			testHarness.processElement(new StreamRecord<>("msg-" + i));
-		}
-
-		// start a thread confirming all pending records
-		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
-		final Thread threadA = Thread.currentThread();
-
-		Runnable confirmer = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					MockProducer mp = producer.getProducerInstance();
-					List<Callback> pending = mp.getPending();
-
-					// we need to find out if the snapshot() method blocks forever
-					// this is not possible. If snapshot() is running, it will
-					// start removing elements from the pending list.
-					synchronized (threadA) {
-						threadA.wait(500L);
-					}
-					// we now check that no records have been confirmed yet
-					Assert.assertEquals(100, pending.size());
-					Assert.assertFalse("Snapshot method returned before all records were confirmed",
-						snapshottingFinished.get());
-
-					// now confirm all checkpoints
-					for (Callback c: pending) {
-						c.onCompletion(null, null);
-					}
-					pending.clear();
-				} catch(Throwable t) {
-					runnableError.f0 = t;
-				}
-			}
-		};
-		Thread threadB = new Thread(confirmer);
-		threadB.start();
-
-		// this should block:
-		testHarness.snapshot(0, 0);
-
-		synchronized (threadA) {
-			threadA.notifyAll(); // just in case, to let the test fail faster
-		}
-		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
-		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
-		while (deadline.hasTimeLeft() && threadB.isAlive()) {
-			threadB.join(500);
-		}
-		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
-		if (runnableError.f0 != null) {
-			throw runnableError.f0;
-		}
-
-		testHarness.close();
-	}
-
-
-	// ------------------------------------------------------------------------
-
-	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
-		private static final long serialVersionUID = 1L;
-
-		private transient MockProducer prod;
-		private AtomicBoolean snapshottingFinished;
-
-		@SuppressWarnings("unchecked")
-		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
-			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
-			this.snapshottingFinished = snapshottingFinished;
-		}
-
-		// constructor variant for test irrelated to snapshotting
-		@SuppressWarnings("unchecked")
-		public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
-			super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
-			this.snapshottingFinished = new AtomicBoolean(true);
-		}
-
-		@Override
-		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
-			this.prod = new MockProducer();
-			return this.prod;
-		}
-
-		@Override
-		public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
-			// call the actual snapshot state
-			super.snapshotState(ctx);
-			// notify test that snapshotting has been done
-			snapshottingFinished.set(true);
-		}
-
-		@Override
-		protected void flush() {
-			this.prod.flush();
-		}
-
-		public MockProducer getProducerInstance() {
-			return this.prod;
-		}
-	}
-
-	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
-		List<Callback> pendingCallbacks = new ArrayList<>();
-
-		public MockProducer() {
-			super(FakeStandardProducerConfig.get());
-		}
-
-		@Override
-		public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
-			throw new UnsupportedOperationException("Unexpected");
-		}
-
-		@Override
-		public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
-			pendingCallbacks.add(callback);
-			return null;
-		}
-
-		@Override
-		public List<PartitionInfo> partitionsFor(String topic) {
-			List<PartitionInfo> list = new ArrayList<>();
-			// deliberately return an out-of-order partition list
-			list.add(new PartitionInfo(topic, 3, null, null, null));
-			list.add(new PartitionInfo(topic, 1, null, null, null));
-			list.add(new PartitionInfo(topic, 0, null, null, null));
-			list.add(new PartitionInfo(topic, 2, null, null, null));
-			return list;
-		}
-
-		@Override
-		public Map<MetricName, ? extends Metric> metrics() {
-			return null;
-		}
-
-
-		public List<Callback> getPending() {
-			return this.pendingCallbacks;
-		}
-
-		public void flush() {
-			while (pendingCallbacks.size() > 0) {
-				try {
-					Thread.sleep(10);
-				} catch (InterruptedException e) {
-					throw new RuntimeException("Unable to flush producer, task was interrupted");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
deleted file mode 100644
index 1882a7e..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONDeserializationSchemaTest {
-	@Test
-	public void testDeserialize() throws IOException {
-		ObjectMapper mapper = new ObjectMapper();
-		ObjectNode initialValue = mapper.createObjectNode();
-		initialValue.put("key", 4).put("value", "world");
-		byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-		JSONDeserializationSchema schema = new JSONDeserializationSchema();
-		ObjectNode deserializedValue = schema.deserialize(serializedValue);
-
-		Assert.assertEquals(4, deserializedValue.get("key").asInt());
-		Assert.assertEquals("world", deserializedValue.get("value").asText());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
deleted file mode 100644
index 86d3105..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONKeyValueDeserializationSchemaTest {
-	@Test
-	public void testDeserializeWithoutMetadata() throws IOException {
-		ObjectMapper mapper = new ObjectMapper();
-		ObjectNode initialKey = mapper.createObjectNode();
-		initialKey.put("index", 4);
-		byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
-		ObjectNode initialValue = mapper.createObjectNode();
-		initialValue.put("word", "world");
-		byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-		JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
-		ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
-
-
-		Assert.assertTrue(deserializedValue.get("metadata") == null);
-		Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
-		Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
-	}
-
-	@Test
-	public void testDeserializeWithMetadata() throws IOException {
-		ObjectMapper mapper = new ObjectMapper();
-		ObjectNode initialKey = mapper.createObjectNode();
-		initialKey.put("index", 4);
-		byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
-		ObjectNode initialValue = mapper.createObjectNode();
-		initialValue.put("word", "world");
-		byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-		JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
-		ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
-
-		Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
-		Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
-		Assert.assertEquals("topic#1", deserializedValue.get("metadata").get("topic").asText());
-		Assert.assertEquals(4, deserializedValue.get("metadata").get("offset").asInt());
-		Assert.assertEquals(3, deserializedValue.get("metadata").get("partition").asInt());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
deleted file mode 100644
index 68225e2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class JsonRowDeserializationSchemaTest {
-
-	/**
-	 * Tests simple deserialization.
-	 */
-	@Test
-	public void testDeserialization() throws Exception {
-		long id = 1238123899121L;
-		String name = "asdlkjasjkdla998y1122";
-		byte[] bytes = new byte[1024];
-		ThreadLocalRandom.current().nextBytes(bytes);
-
-		ObjectMapper objectMapper = new ObjectMapper();
-
-		// Root
-		ObjectNode root = objectMapper.createObjectNode();
-		root.put("id", id);
-		root.put("name", name);
-		root.put("bytes", bytes);
-
-		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
-				new String[] { "id", "name", "bytes" },
-				new Class<?>[] { Long.class, String.class, byte[].class });
-
-		Row deserialized = deserializationSchema.deserialize(serializedJson);
-
-		assertEquals(3, deserialized.productArity());
-		assertEquals(id, deserialized.productElement(0));
-		assertEquals(name, deserialized.productElement(1));
-		assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
-	}
-
-	/**
-	 * Tests deserialization with non-existing field name.
-	 */
-	@Test
-	public void testMissingNode() throws Exception {
-		ObjectMapper objectMapper = new ObjectMapper();
-
-		// Root
-		ObjectNode root = objectMapper.createObjectNode();
-		root.put("id", 123123123);
-		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
-				new String[] { "name" },
-				new Class<?>[] { String.class });
-
-		Row row = deserializationSchema.deserialize(serializedJson);
-
-		assertEquals(1, row.productArity());
-		assertNull("Missing field not null", row.productElement(0));
-
-		deserializationSchema.setFailOnMissingField(true);
-
-		try {
-			deserializationSchema.deserialize(serializedJson);
-			fail("Did not throw expected Exception");
-		} catch (IOException e) {
-			assertTrue(e.getCause() instanceof IllegalStateException);
-		}
-	}
-
-	/**
-	 * Tests that number of field names and types has to match.
-	 */
-	@Test
-	public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
-		try {
-			new JsonRowDeserializationSchema(
-					new String[] { "one", "two", "three" },
-					new Class<?>[] { Long.class });
-			fail("Did not throw expected Exception");
-		} catch (IllegalArgumentException ignored) {
-			// Expected
-		}
-
-		try {
-			new JsonRowDeserializationSchema(
-					new String[] { "one" },
-					new Class<?>[] { Long.class, String.class });
-			fail("Did not throw expected Exception");
-		} catch (IllegalArgumentException ignored) {
-			// Expected
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
deleted file mode 100644
index 92af15d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public class JsonRowSerializationSchemaTest {
-	@Test
-	public void testRowSerialization() throws IOException {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
-		Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
-		Row row = new Row(3);
-		row.setField(0, 1);
-		row.setField(1, true);
-		row.setField(2, "str");
-
-		Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
-		assertEqualRows(row, resultRow);
-	}
-
-	@Test
-	public void testSerializationOfTwoRows() throws IOException {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
-		Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
-		Row row1 = new Row(3);
-		row1.setField(0, 1);
-		row1.setField(1, true);
-		row1.setField(2, "str");
-
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
-		byte[] bytes = serializationSchema.serialize(row1);
-		assertEqualRows(row1, deserializationSchema.deserialize(bytes));
-
-		Row row2 = new Row(3);
-		row2.setField(0, 10);
-		row2.setField(1, false);
-		row2.setField(2, "newStr");
-
-		bytes = serializationSchema.serialize(row2);
-		assertEqualRows(row2, deserializationSchema.deserialize(bytes));
-	}
-
-	@Test(expected = NullPointerException.class)
-	public void testInputValidation() {
-		new JsonRowSerializationSchema(null);
-	}
-
-	@Test(expected = IllegalStateException.class)
-	public void testSerializeRowWithInvalidNumberOfFields() {
-		String[] fieldNames = new String[] {"f1", "f2", "f3"};
-		Row row = new Row(1);
-		row.setField(0, 1);
-
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
-		serializationSchema.serialize(row);
-	}
-
-	private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException {
-		JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
-		byte[] bytes = serializationSchema.serialize(row);
-		return deserializationSchema.deserialize(bytes);
-	}
-
-	private void assertEqualRows(Row expectedRow, Row resultRow) {
-		assertEquals("Deserialized row should have expected number of fields",
-			expectedRow.productArity(), resultRow.productArity());
-		for (int i = 0; i < expectedRow.productArity(); i++) {
-			assertEquals(String.format("Field number %d should be as in the original row", i),
-				expectedRow.productElement(i), resultRow.productElement(i));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 9beed22..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			List<KafkaTopicPartition> inPartitions = Arrays.asList(
-					new KafkaTopicPartition("test-topic", 4),
-					new KafkaTopicPartition("test-topic", 52),
-					new KafkaTopicPartition("test-topic", 17),
-					new KafkaTopicPartition("test-topic", 1));
-
-			for (int i = 0; i < inPartitions.size(); i++) {
-				List<KafkaTopicPartition> parts = 
-						FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
-
-				assertNotNull(parts);
-				assertEquals(1, parts.size());
-				assertTrue(contains(inPartitions, parts.get(0).getPartition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
-		for (KafkaTopicPartition ktp : inPartitions) {
-			if (ktp.getPartition() == partition) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final List<KafkaTopicPartition> partitions = new ArrayList<>();
-			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-
-			for (int p : partitionIDs) {
-				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
-				partitions.add(part);
-				allPartitions.add(part);
-			}
-
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.size() / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
-
-			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartition> parts = 
-						FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() >= minPartitionsPerConsumer);
-				assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
-				for (KafkaTopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			List<KafkaTopicPartition> inPartitions = Arrays.asList(
-					new KafkaTopicPartition("test-topic", 4),
-					new KafkaTopicPartition("test-topic", 52),
-					new KafkaTopicPartition("test-topic", 17),
-					new KafkaTopicPartition("test-topic", 1));
-
-			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-			allPartitions.addAll(inPartitions);
-
-			final int numConsumers = 2 * inPartitions.size() + 3;
-
-			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() <= 1);
-
-				for (KafkaTopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<KafkaTopicPartition> ep = new ArrayList<>();
-			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
-			assertNotNull(parts1);
-			assertTrue(parts1.isEmpty());
-
-			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
-			assertNotNull(parts2);
-			assertTrue(parts2.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			List<KafkaTopicPartition> newPartitions = new ArrayList<>();
-
-			for (int p : newPartitionIDs) {
-				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
-				newPartitions.add(part);
-			}
-
-			List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
-
-			final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
-			final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
-
-			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 0);
-			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 1);
-			List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 2);
-
-			assertNotNull(parts1);
-			assertNotNull(parts2);
-			assertNotNull(parts3);
-
-			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (KafkaTopicPartition p : parts1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : parts2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : parts3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-
-			// grow the set of partitions and distribute anew
-
-			List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 0);
-			List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 1);
-			List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 2);
-
-			// new partitions must include all old partitions
-
-			assertTrue(parts1new.size() > parts1.size());
-			assertTrue(parts2new.size() > parts2.size());
-			assertTrue(parts3new.size() > parts3.size());
-
-			assertTrue(parts1new.containsAll(parts1));
-			assertTrue(parts2new.containsAll(parts2));
-			assertTrue(parts3new.containsAll(parts3));
-
-			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
-			for (KafkaTopicPartition p : parts1new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : parts2new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartition p : parts3new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-}