You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:56 UTC
[06/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
deleted file mode 100644
index 51bc8d1..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
- * transform typed from and to byte arrays.
- *
- * @param <K> The key type to be serialized.
- * @param <V> The value type to be serialized.
- */
-public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
-
- private static final long serialVersionUID = -5359448468131559102L;
-
- /** The serializer for the key */
- private final TypeSerializer<K> keySerializer;
-
- /** The serializer for the value */
- private final TypeSerializer<V> valueSerializer;
-
- /** reusable input deserialization buffer */
- private final DataInputDeserializer inputDeserializer;
-
- /** reusable output serialization buffer for the key */
- private transient DataOutputSerializer keyOutputSerializer;
-
- /** reusable output serialization buffer for the value */
- private transient DataOutputSerializer valueOutputSerializer;
-
-
- /** The type information, to be returned by {@link #getProducedType()}. It is
- * transient, because it is not serializable. Note that this means that the type information
- * is not available at runtime, but only prior to the first serialization / deserialization */
- private final transient TypeInformation<Tuple2<K, V>> typeInfo;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new de-/serialization schema for the given types.
- *
- * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
- * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
- * @param ec The execution config, which is used to parametrize the type serializers.
- */
- public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
- this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
- this.keySerializer = keyTypeInfo.createSerializer(ec);
- this.valueSerializer = valueTypeInfo.createSerializer(ec);
- this.inputDeserializer = new DataInputDeserializer();
- }
-
- /**
- * Creates a new de-/serialization schema for the given types. This constructor accepts the types
- * as classes and internally constructs the type information from the classes.
- *
- * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
- * that accepts {@link TypeInformation} instead.
- *
- * @param keyClass The class of the key de-/serialized by this schema.
- * @param valueClass The class of the value de-/serialized by this schema.
- * @param config The execution config, which is used to parametrize the type serializers.
- */
- public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
- this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config);
- }
-
- // ------------------------------------------------------------------------
-
-
- @Override
- public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- K key = null;
- V value = null;
-
- if (messageKey != null) {
- inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
- key = keySerializer.deserialize(inputDeserializer);
- }
- if (message != null) {
- inputDeserializer.setBuffer(message, 0, message.length);
- value = valueSerializer.deserialize(inputDeserializer);
- }
- return new Tuple2<>(key, value);
- }
-
- /**
- * This schema never considers an element to signal end-of-stream, so this method returns always false.
- * @param nextElement The element to test for the end-of-stream signal.
- * @return Returns false.
- */
- @Override
- public boolean isEndOfStream(Tuple2<K,V> nextElement) {
- return false;
- }
-
-
- @Override
- public byte[] serializeKey(Tuple2<K, V> element) {
- if (element.f0 == null) {
- return null;
- } else {
- // key is not null. serialize it:
- if (keyOutputSerializer == null) {
- keyOutputSerializer = new DataOutputSerializer(16);
- }
- try {
- keySerializer.serialize(element.f0, keyOutputSerializer);
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to serialize record", e);
- }
- // check if key byte array size changed
- byte[] res = keyOutputSerializer.getByteArray();
- if (res.length != keyOutputSerializer.length()) {
- byte[] n = new byte[keyOutputSerializer.length()];
- System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
- res = n;
- }
- keyOutputSerializer.clear();
- return res;
- }
- }
-
- @Override
- public byte[] serializeValue(Tuple2<K, V> element) {
- // if the value is null, its serialized value is null as well.
- if (element.f1 == null) {
- return null;
- }
-
- if (valueOutputSerializer == null) {
- valueOutputSerializer = new DataOutputSerializer(16);
- }
-
- try {
- valueSerializer.serialize(element.f1, valueOutputSerializer);
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to serialize record", e);
- }
-
- byte[] res = valueOutputSerializer.getByteArray();
- if (res.length != valueOutputSerializer.length()) {
- byte[] n = new byte[valueOutputSerializer.length()];
- System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
- res = n;
- }
- valueOutputSerializer.clear();
- return res;
- }
-
- @Override
- public String getTargetTopic(Tuple2<K, V> element) {
- return null; // we are never overriding the topic
- }
-
-
- @Override
- public TypeInformation<Tuple2<K,V>> getProducedType() {
- if (typeInfo != null) {
- return typeInfo;
- }
- else {
- throw new IllegalStateException(
- "The type information is not available after this class has been serialized and distributed.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
deleted file mode 100644
index b96ba30..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaConsumerBaseTest {
-
- /**
- * Tests that not both types of timestamp extractors / watermark generators can be used.
- */
- @Test
- public void testEitherWatermarkExtractor() {
- try {
- new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
- fail();
- } catch (NullPointerException ignored) {}
-
- try {
- new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
- fail();
- } catch (NullPointerException ignored) {}
-
- @SuppressWarnings("unchecked")
- final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
- @SuppressWarnings("unchecked")
- final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
-
- DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
- c1.assignTimestampsAndWatermarks(periodicAssigner);
- try {
- c1.assignTimestampsAndWatermarks(punctuatedAssigner);
- fail();
- } catch (IllegalStateException ignored) {}
-
- DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
- c2.assignTimestampsAndWatermarks(punctuatedAssigner);
- try {
- c2.assignTimestampsAndWatermarks(periodicAssigner);
- fail();
- } catch (IllegalStateException ignored) {}
- }
-
- /**
- * Tests that no checkpoints happen when the fetcher is not running.
- */
- @Test
- public void ignoreCheckpointWhenNotRunning() throws Exception {
- @SuppressWarnings("unchecked")
- final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
-
- FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
- TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
- when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
-
- assertFalse(listState.get().iterator().hasNext());
- consumer.notifyCheckpointComplete(66L);
- }
-
- /**
- * Tests that no checkpoints happen when the fetcher is not running.
- */
- @Test
- public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-
- TestingListState<Serializable> listState = new TestingListState<>();
- listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
- listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
-
- FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
- when(initializationContext.isRestored()).thenReturn(true);
-
- consumer.initializeState(initializationContext);
-
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
-
- // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
- // to figure out that snapshotState() actually did something.
- Assert.assertTrue(listState.isClearCalled());
-
- Set<Serializable> expected = new HashSet<>();
-
- for (Serializable serializable : listState.get()) {
- expected.add(serializable);
- }
-
- int counter = 0;
-
- for (Serializable serializable : listState.get()) {
- assertTrue(expected.contains(serializable));
- counter++;
- }
-
- assertEquals(expected.size(), counter);
- }
-
- /**
- * Tests that no checkpoints happen when the fetcher is not running.
- */
- @Test
- public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
- FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
- TestingListState<Serializable> listState = new TestingListState<>();
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
- when(initializationContext.isRestored()).thenReturn(false);
-
- consumer.initializeState(initializationContext);
-
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
-
- assertFalse(listState.get().iterator().hasNext());
- }
-
- /**
- * Tests that on snapshots, states and offsets to commit to Kafka are correct
- */
- @Test
- public void checkUseFetcherWhenNoCheckpoint() throws Exception {
-
- FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
- List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
- partitionList.add(new KafkaTopicPartition("test", 0));
- consumer.setSubscribedPartitions(partitionList);
-
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
- TestingListState<Serializable> listState = new TestingListState<>();
- when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-
- // make the context signal that there is no restored state, then validate that
- when(initializationContext.isRestored()).thenReturn(false);
- consumer.initializeState(initializationContext);
- consumer.run(mock(SourceFunction.SourceContext.class));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSnapshotState() throws Exception {
-
- // --------------------------------------------------------------------
- // prepare fake states
- // --------------------------------------------------------------------
-
- final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
- state1.put(new KafkaTopicPartition("abc", 13), 16768L);
- state1.put(new KafkaTopicPartition("def", 7), 987654321L);
-
- final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
- state2.put(new KafkaTopicPartition("abc", 13), 16770L);
- state2.put(new KafkaTopicPartition("def", 7), 987654329L);
-
- final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
- state3.put(new KafkaTopicPartition("abc", 13), 16780L);
- state3.put(new KafkaTopicPartition("def", 7), 987654377L);
-
- // --------------------------------------------------------------------
-
- final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
- when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
-
- final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
- FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
- assertEquals(0, pendingOffsetsToCommit.size());
-
- OperatorStateStore backend = mock(OperatorStateStore.class);
-
- TestingListState<Serializable> listState = new TestingListState<>();
-
- when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(backend);
- when(initializationContext.isRestored()).thenReturn(false, true, true, true);
-
- consumer.initializeState(initializationContext);
-
- // checkpoint 1
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
-
- HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
-
- for (Serializable serializable : listState.get()) {
- Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
- snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
- }
-
- assertEquals(state1, snapshot1);
- assertEquals(1, pendingOffsetsToCommit.size());
- assertEquals(state1, pendingOffsetsToCommit.get(138L));
-
- // checkpoint 2
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140));
-
- HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
-
- for (Serializable serializable : listState.get()) {
- Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
- snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
- }
-
- assertEquals(state2, snapshot2);
- assertEquals(2, pendingOffsetsToCommit.size());
- assertEquals(state2, pendingOffsetsToCommit.get(140L));
-
- // ack checkpoint 1
- consumer.notifyCheckpointComplete(138L);
- assertEquals(1, pendingOffsetsToCommit.size());
- assertTrue(pendingOffsetsToCommit.containsKey(140L));
-
- // checkpoint 3
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
-
- HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
-
- for (Serializable serializable : listState.get()) {
- Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable;
- snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1);
- }
-
- assertEquals(state3, snapshot3);
- assertEquals(2, pendingOffsetsToCommit.size());
- assertEquals(state3, pendingOffsetsToCommit.get(141L));
-
- // ack checkpoint 3, subsumes number 2
- consumer.notifyCheckpointComplete(141L);
- assertEquals(0, pendingOffsetsToCommit.size());
-
-
- consumer.notifyCheckpointComplete(666); // invalid checkpoint
- assertEquals(0, pendingOffsetsToCommit.size());
-
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
- listState = new TestingListState<>();
- when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i));
- listState.clear();
- }
- assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size());
-
- // commit only the second last
- consumer.notifyCheckpointComplete(598);
- assertEquals(1, pendingOffsetsToCommit.size());
-
- // access invalid checkpoint
- consumer.notifyCheckpointComplete(590);
-
- // and the last
- consumer.notifyCheckpointComplete(599);
- assertEquals(0, pendingOffsetsToCommit.size());
- }
-
- // ------------------------------------------------------------------------
-
- private static <T> FlinkKafkaConsumerBase<T> getConsumer(
- AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
- {
- FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
-
- Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
- fetcherField.setAccessible(true);
- fetcherField.set(consumer, fetcher);
-
- Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
- mapField.setAccessible(true);
- mapField.set(consumer, pendingOffsetsToCommit);
-
- Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
- runningField.setAccessible(true);
- runningField.set(consumer, running);
-
- return consumer;
- }
-
- // ------------------------------------------------------------------------
-
- private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unchecked")
- public DummyFlinkKafkaConsumer() {
- super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
- }
-
- @Override
- protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
- AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Assert.fail("Trying to restore offsets even though there was no restore state.");
- return null;
- }
- }).when(fetcher).restoreOffsets(any(HashMap.class));
- return fetcher;
- }
-
- @Override
- protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
- return Collections.emptyList();
- }
-
- @Override
- public RuntimeContext getRuntimeContext() {
- return mock(StreamingRuntimeContext.class);
- }
- }
-
- private static final class TestingListState<T> implements ListState<T> {
-
- private final List<T> list = new ArrayList<>();
- private boolean clearCalled = false;
-
- @Override
- public void clear() {
- list.clear();
- clearCalled = true;
- }
-
- @Override
- public Iterable<T> get() throws Exception {
- return list;
- }
-
- @Override
- public void add(T value) throws Exception {
- list.add(value);
- }
-
- public List<T> getList() {
- return list;
- }
-
- public boolean isClearCalled() {
- return clearCalled;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
deleted file mode 100644
index 2e06160..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaProducerBaseTest {
-
- /**
- * Tests that the constructor eagerly checks bootstrap servers are set in config
- */
- @Test(expected = IllegalArgumentException.class)
- public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
- // no bootstrap servers set in props
- Properties props = new Properties();
- // should throw IllegalArgumentException
- new DummyFlinkKafkaProducer<>(props, null);
- }
-
- /**
- * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set
- */
- @Test
- public void testKeyValueDeserializersSetIfMissing() throws Exception {
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
- // should set missing key value deserializers
- new DummyFlinkKafkaProducer<>(props, null);
-
- assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
- assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
- assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
- assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
- }
-
- /**
- * Tests that partitions list is determinate and correctly provided to custom partitioner
- */
- @Test
- public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
- KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
- RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
- when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
- when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
- DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
- FakeStandardProducerConfig.get(), mockPartitioner);
- producer.setRuntimeContext(mockRuntimeContext);
-
- producer.open(new Configuration());
-
- // the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
- // which should be sorted before provided to the custom partitioner's open() method
- int[] correctPartitionList = {0, 1, 2, 3};
- verify(mockPartitioner).open(0, 1, correctPartitionList);
- }
-
- /**
- * Test ensuring that the producer is not dropping buffered records.;
- * we set a timeout because the test will not finish if the logic is broken
- */
- @Test(timeout=5000)
- public void testAtLeastOnceProducer() throws Throwable {
- runAtLeastOnceTest(true);
- }
-
- /**
- * Ensures that the at least once producing test fails if the flushing is disabled
- */
- @Test(expected = AssertionError.class, timeout=5000)
- public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
- runAtLeastOnceTest(false);
- }
-
- private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
- final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
- final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null, snapshottingFinished);
- producer.setFlushOnCheckpoint(flushOnCheckpoint);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
-
- testHarness.open();
-
- for (int i = 0; i < 100; i++) {
- testHarness.processElement(new StreamRecord<>("msg-" + i));
- }
-
- // start a thread confirming all pending records
- final Tuple1<Throwable> runnableError = new Tuple1<>(null);
- final Thread threadA = Thread.currentThread();
-
- Runnable confirmer = new Runnable() {
- @Override
- public void run() {
- try {
- MockProducer mp = producer.getProducerInstance();
- List<Callback> pending = mp.getPending();
-
- // we need to find out if the snapshot() method blocks forever
- // this is not possible. If snapshot() is running, it will
- // start removing elements from the pending list.
- synchronized (threadA) {
- threadA.wait(500L);
- }
- // we now check that no records have been confirmed yet
- Assert.assertEquals(100, pending.size());
- Assert.assertFalse("Snapshot method returned before all records were confirmed",
- snapshottingFinished.get());
-
- // now confirm all checkpoints
- for (Callback c: pending) {
- c.onCompletion(null, null);
- }
- pending.clear();
- } catch(Throwable t) {
- runnableError.f0 = t;
- }
- }
- };
- Thread threadB = new Thread(confirmer);
- threadB.start();
-
- // this should block:
- testHarness.snapshot(0, 0);
-
- synchronized (threadA) {
- threadA.notifyAll(); // just in case, to let the test fail faster
- }
- Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
- Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
- while (deadline.hasTimeLeft() && threadB.isAlive()) {
- threadB.join(500);
- }
- Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
- if (runnableError.f0 != null) {
- throw runnableError.f0;
- }
-
- testHarness.close();
- }
-
-
- // ------------------------------------------------------------------------
-
- private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
- private static final long serialVersionUID = 1L;
-
- private transient MockProducer prod;
- private AtomicBoolean snapshottingFinished;
-
- @SuppressWarnings("unchecked")
- public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
- super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
- this.snapshottingFinished = snapshottingFinished;
- }
-
- // constructor variant for test irrelated to snapshotting
- @SuppressWarnings("unchecked")
- public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
- super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
- this.snapshottingFinished = new AtomicBoolean(true);
- }
-
- @Override
- protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
- this.prod = new MockProducer();
- return this.prod;
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
- // call the actual snapshot state
- super.snapshotState(ctx);
- // notify test that snapshotting has been done
- snapshottingFinished.set(true);
- }
-
- @Override
- protected void flush() {
- this.prod.flush();
- }
-
- public MockProducer getProducerInstance() {
- return this.prod;
- }
- }
-
- private static class MockProducer<K, V> extends KafkaProducer<K, V> {
- List<Callback> pendingCallbacks = new ArrayList<>();
-
- public MockProducer() {
- super(FakeStandardProducerConfig.get());
- }
-
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- throw new UnsupportedOperationException("Unexpected");
- }
-
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- pendingCallbacks.add(callback);
- return null;
- }
-
- @Override
- public List<PartitionInfo> partitionsFor(String topic) {
- List<PartitionInfo> list = new ArrayList<>();
- // deliberately return an out-of-order partition list
- list.add(new PartitionInfo(topic, 3, null, null, null));
- list.add(new PartitionInfo(topic, 1, null, null, null));
- list.add(new PartitionInfo(topic, 0, null, null, null));
- list.add(new PartitionInfo(topic, 2, null, null, null));
- return list;
- }
-
- @Override
- public Map<MetricName, ? extends Metric> metrics() {
- return null;
- }
-
-
- public List<Callback> getPending() {
- return this.pendingCallbacks;
- }
-
- public void flush() {
- while (pendingCallbacks.size() > 0) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- throw new RuntimeException("Unable to flush producer, task was interrupted");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
deleted file mode 100644
index 1882a7e..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONDeserializationSchemaTest {
- @Test
- public void testDeserialize() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialValue = mapper.createObjectNode();
- initialValue.put("key", 4).put("value", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
- JSONDeserializationSchema schema = new JSONDeserializationSchema();
- ObjectNode deserializedValue = schema.deserialize(serializedValue);
-
- Assert.assertEquals(4, deserializedValue.get("key").asInt());
- Assert.assertEquals("world", deserializedValue.get("value").asText());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
deleted file mode 100644
index 86d3105..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONKeyValueDeserializationSchemaTest {
- @Test
- public void testDeserializeWithoutMetadata() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
- initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
- ObjectNode initialValue = mapper.createObjectNode();
- initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
- JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
- ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
-
-
- Assert.assertTrue(deserializedValue.get("metadata") == null);
- Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
- Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
- }
-
- @Test
- public void testDeserializeWithMetadata() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
- initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
- ObjectNode initialValue = mapper.createObjectNode();
- initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
- JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
- ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
-
- Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
- Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
- Assert.assertEquals("topic#1", deserializedValue.get("metadata").get("topic").asText());
- Assert.assertEquals(4, deserializedValue.get("metadata").get("offset").asInt());
- Assert.assertEquals(3, deserializedValue.get("metadata").get("partition").asInt());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
deleted file mode 100644
index 68225e2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class JsonRowDeserializationSchemaTest {
-
- /**
- * Tests simple deserialization.
- */
- @Test
- public void testDeserialization() throws Exception {
- long id = 1238123899121L;
- String name = "asdlkjasjkdla998y1122";
- byte[] bytes = new byte[1024];
- ThreadLocalRandom.current().nextBytes(bytes);
-
- ObjectMapper objectMapper = new ObjectMapper();
-
- // Root
- ObjectNode root = objectMapper.createObjectNode();
- root.put("id", id);
- root.put("name", name);
- root.put("bytes", bytes);
-
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
- new String[] { "id", "name", "bytes" },
- new Class<?>[] { Long.class, String.class, byte[].class });
-
- Row deserialized = deserializationSchema.deserialize(serializedJson);
-
- assertEquals(3, deserialized.productArity());
- assertEquals(id, deserialized.productElement(0));
- assertEquals(name, deserialized.productElement(1));
- assertArrayEquals(bytes, (byte[]) deserialized.productElement(2));
- }
-
- /**
- * Tests deserialization with non-existing field name.
- */
- @Test
- public void testMissingNode() throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
-
- // Root
- ObjectNode root = objectMapper.createObjectNode();
- root.put("id", 123123123);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
- new String[] { "name" },
- new Class<?>[] { String.class });
-
- Row row = deserializationSchema.deserialize(serializedJson);
-
- assertEquals(1, row.productArity());
- assertNull("Missing field not null", row.productElement(0));
-
- deserializationSchema.setFailOnMissingField(true);
-
- try {
- deserializationSchema.deserialize(serializedJson);
- fail("Did not throw expected Exception");
- } catch (IOException e) {
- assertTrue(e.getCause() instanceof IllegalStateException);
- }
- }
-
- /**
- * Tests that number of field names and types has to match.
- */
- @Test
- public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
- try {
- new JsonRowDeserializationSchema(
- new String[] { "one", "two", "three" },
- new Class<?>[] { Long.class });
- fail("Did not throw expected Exception");
- } catch (IllegalArgumentException ignored) {
- // Expected
- }
-
- try {
- new JsonRowDeserializationSchema(
- new String[] { "one" },
- new Class<?>[] { Long.class, String.class });
- fail("Did not throw expected Exception");
- } catch (IllegalArgumentException ignored) {
- // Expected
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
deleted file mode 100644
index 92af15d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public class JsonRowSerializationSchemaTest {
- @Test
- public void testRowSerialization() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
- Row row = new Row(3);
- row.setField(0, 1);
- row.setField(1, true);
- row.setField(2, "str");
-
- Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
- assertEqualRows(row, resultRow);
- }
-
- @Test
- public void testSerializationOfTwoRows() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class};
- Row row1 = new Row(3);
- row1.setField(0, 1);
- row1.setField(1, true);
- row1.setField(2, "str");
-
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
- byte[] bytes = serializationSchema.serialize(row1);
- assertEqualRows(row1, deserializationSchema.deserialize(bytes));
-
- Row row2 = new Row(3);
- row2.setField(0, 10);
- row2.setField(1, false);
- row2.setField(2, "newStr");
-
- bytes = serializationSchema.serialize(row2);
- assertEqualRows(row2, deserializationSchema.deserialize(bytes));
- }
-
- @Test(expected = NullPointerException.class)
- public void testInputValidation() {
- new JsonRowSerializationSchema(null);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSerializeRowWithInvalidNumberOfFields() {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- Row row = new Row(1);
- row.setField(0, 1);
-
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- serializationSchema.serialize(row);
- }
-
- private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException {
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
- byte[] bytes = serializationSchema.serialize(row);
- return deserializationSchema.deserialize(bytes);
- }
-
- private void assertEqualRows(Row expectedRow, Row resultRow) {
- assertEquals("Deserialized row should have expected number of fields",
- expectedRow.productArity(), resultRow.productArity());
- for (int i = 0; i < expectedRow.productArity(); i++) {
- assertEquals(String.format("Field number %d should be as in the original row", i),
- expectedRow.productElement(i), resultRow.productElement(i));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 9beed22..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
- @Test
- public void testPartitionsEqualConsumers() {
- try {
- List<KafkaTopicPartition> inPartitions = Arrays.asList(
- new KafkaTopicPartition("test-topic", 4),
- new KafkaTopicPartition("test-topic", 52),
- new KafkaTopicPartition("test-topic", 17),
- new KafkaTopicPartition("test-topic", 1));
-
- for (int i = 0; i < inPartitions.size(); i++) {
- List<KafkaTopicPartition> parts =
- FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
-
- assertNotNull(parts);
- assertEquals(1, parts.size());
- assertTrue(contains(inPartitions, parts.get(0).getPartition()));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
- for (KafkaTopicPartition ktp : inPartitions) {
- if (ktp.getPartition() == partition) {
- return true;
- }
- }
- return false;
- }
-
- @Test
- public void testMultiplePartitionsPerConsumers() {
- try {
- final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
- final List<KafkaTopicPartition> partitions = new ArrayList<>();
- final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
-
- for (int p : partitionIDs) {
- KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
- partitions.add(part);
- allPartitions.add(part);
- }
-
- final int numConsumers = 3;
- final int minPartitionsPerConsumer = partitions.size() / numConsumers;
- final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
-
- for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartition> parts =
- FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
-
- assertNotNull(parts);
- assertTrue(parts.size() >= minPartitionsPerConsumer);
- assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : parts) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPartitionsFewerThanConsumers() {
- try {
- List<KafkaTopicPartition> inPartitions = Arrays.asList(
- new KafkaTopicPartition("test-topic", 4),
- new KafkaTopicPartition("test-topic", 52),
- new KafkaTopicPartition("test-topic", 17),
- new KafkaTopicPartition("test-topic", 1));
-
- final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
- allPartitions.addAll(inPartitions);
-
- final int numConsumers = 2 * inPartitions.size() + 3;
-
- for (int i = 0; i < numConsumers; i++) {
- List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
-
- assertNotNull(parts);
- assertTrue(parts.size() <= 1);
-
- for (KafkaTopicPartition p : parts) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testAssignEmptyPartitions() {
- try {
- List<KafkaTopicPartition> ep = new ArrayList<>();
- List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
- assertNotNull(parts1);
- assertTrue(parts1.isEmpty());
-
- List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
- assertNotNull(parts2);
- assertTrue(parts2.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGrowingPartitionsRemainsStable() {
- try {
- final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
- List<KafkaTopicPartition> newPartitions = new ArrayList<>();
-
- for (int p : newPartitionIDs) {
- KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
- newPartitions.add(part);
- }
-
- List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
-
- final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
- final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
-
- final int numConsumers = 3;
- final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
- final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
- final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
- final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
-
- List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 0);
- List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 1);
- List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
- initialPartitions, numConsumers, 2);
-
- assertNotNull(parts1);
- assertNotNull(parts2);
- assertNotNull(parts3);
-
- assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : parts1) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
- for (KafkaTopicPartition p : parts2) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
- for (KafkaTopicPartition p : parts3) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p));
- }
-
- // all partitions must have been assigned
- assertTrue(allInitialPartitions.isEmpty());
-
- // grow the set of partitions and distribute anew
-
- List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 0);
- List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 1);
- List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
- newPartitions, numConsumers, 2);
-
- // new partitions must include all old partitions
-
- assertTrue(parts1new.size() > parts1.size());
- assertTrue(parts2new.size() > parts2.size());
- assertTrue(parts3new.size() > parts3.size());
-
- assertTrue(parts1new.containsAll(parts1));
- assertTrue(parts2new.containsAll(parts2));
- assertTrue(parts3new.containsAll(parts3));
-
- assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
- for (KafkaTopicPartition p : parts1new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
- for (KafkaTopicPartition p : parts2new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
- for (KafkaTopicPartition p : parts3new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p));
- }
-
- // all partitions must have been assigned
- assertTrue(allNewPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-}