You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:03 UTC
[47/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 42a5951..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.common.TopicAndPartition;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.zookeeper.data.Stat;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class ZookeeperOffsetHandler implements OffsetHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-
- private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-
-
- private final ZkClient zkClient;
-
- private final String groupId;
-
-
- public ZookeeperOffsetHandler(Properties props) {
- this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-
- if (this.groupId == null) {
- throw new IllegalArgumentException("Required property '"
- + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
- }
-
- String zkConnect = props.getProperty("zookeeper.connect");
- if (zkConnect == null) {
- throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
- }
-
- zkClient = new ZkClient(zkConnect,
- Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
- Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
- new ZooKeeperStringSerializer());
- }
-
-
- @Override
- public void commit(Map<TopicPartition, Long> offsetsToCommit) {
- for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
- TopicPartition tp = entry.getKey();
- long offset = entry.getValue();
-
- if (offset >= 0) {
- setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
- }
- }
- }
-
- @Override
- public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
- for (TopicPartition tp : partitions) {
- long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
-
- if (offset != OFFSET_NOT_SET) {
- LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
- tp.partition(), offset);
-
- // the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
- fetcher.seek(tp, offset + 1);
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- zkClient.close();
- }
-
- // ------------------------------------------------------------------------
- // Communication with Zookeeper
- // ------------------------------------------------------------------------
-
- public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
- TopicAndPartition tap = new TopicAndPartition(topic, partition);
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
- }
-
- public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
- TopicAndPartition tap = new TopicAndPartition(topic, partition);
- ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-
- scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
- topicDirs.consumerOffsetDir() + "/" + tap.partition());
-
- if (data._1().isEmpty()) {
- return OFFSET_NOT_SET;
- } else {
- return Long.valueOf(data._1().get());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 346a7d5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * # More Flink partitions than kafka partitions
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- * --> Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- * </pre>
- *
- * --> Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
- *
- */
-public class FixedPartitioner extends KafkaPartitioner implements Serializable {
- private static final long serialVersionUID = 1627268846962918126L;
-
- int targetPartition = -1;
-
- @Override
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- int p = 0;
- for(int i = 0; i < parallelInstances; i++) {
- if(i == parallelInstanceId) {
- targetPartition = partitions[p];
- return;
- }
- if(++p == partitions.length) {
- p = 0;
- }
- }
- }
-
- @Override
- public int partition(Object element, int numPartitions) {
- if(targetPartition == -1) {
- throw new RuntimeException("The partitioner has not been initialized properly");
- }
- return targetPartition;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 55519f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-/**
- * Extended Kafka Partitioner.
- * It contains a open() method which is called on each parallel instance.
- * Partitioners have to be serializable!
- */
-public abstract class KafkaPartitioner implements Partitioner, Serializable {
-
- private static final long serialVersionUID = -1974260817778593473L;
-
- /**
- * Initializer for the Partitioner.
- * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
- * @param parallelInstances the total number of parallel instances
- * @param partitions an array describing the partition IDs of the available Kafka partitions.
- */
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- // overwrite this method if needed.
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 3d392aa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
- @Test
- public void testPartitionsEqualConsumers() {
- try {
- int[] partitions = {4, 52, 17, 1};
-
- for (int i = 0; i < partitions.length; i++) {
- List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
- partitions, "test-topic", partitions.length, i);
-
- assertNotNull(parts);
- assertEquals(1, parts.size());
- assertTrue(contains(partitions, parts.get(0).partition()));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultiplePartitionsPerConsumers() {
- try {
- final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
- final Set<Integer> allPartitions = new HashSet<>();
- for (int i : partitions) {
- allPartitions.add(i);
- }
-
- final int numConsumers = 3;
- final int minPartitionsPerConsumer = partitions.length / numConsumers;
- final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
-
- for (int i = 0; i < numConsumers; i++) {
- List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
- partitions, "test-topic", numConsumers, i);
-
- assertNotNull(parts);
- assertTrue(parts.size() >= minPartitionsPerConsumer);
- assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
- for (TopicPartition p : parts) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p.partition()));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPartitionsFewerThanConsumers() {
- try {
- final int[] partitions = {4, 52, 17, 1};
-
- final Set<Integer> allPartitions = new HashSet<>();
- for (int i : partitions) {
- allPartitions.add(i);
- }
-
- final int numConsumers = 2 * partitions.length + 3;
-
- for (int i = 0; i < numConsumers; i++) {
- List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
- partitions, "test-topic", numConsumers, i);
-
- assertNotNull(parts);
- assertTrue(parts.size() <= 1);
-
- for (TopicPartition p : parts) {
- // check that the element was actually contained
- assertTrue(allPartitions.remove(p.partition()));
- }
- }
-
- // all partitions must have been assigned
- assertTrue(allPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testAssignEmptyPartitions() {
- try {
- List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
- assertNotNull(parts1);
- assertTrue(parts1.isEmpty());
-
- List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
- assertNotNull(parts2);
- assertTrue(parts2.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGrowingPartitionsRemainsStable() {
- try {
- final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
- final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
-
- final Set<Integer> allNewPartitions = new HashSet<>();
- final Set<Integer> allInitialPartitions = new HashSet<>();
- for (int i : newPartitions) {
- allNewPartitions.add(i);
- }
- for (int i : initialPartitions) {
- allInitialPartitions.add(i);
- }
-
- final int numConsumers = 3;
- final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
- final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
- final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
- final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
-
- List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
- initialPartitions, "test-topic", numConsumers, 0);
- List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
- initialPartitions, "test-topic", numConsumers, 1);
- List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
- initialPartitions, "test-topic", numConsumers, 2);
-
- assertNotNull(parts1);
- assertNotNull(parts2);
- assertNotNull(parts3);
-
- assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
- assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
- for (TopicPartition p : parts1) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p.partition()));
- }
- for (TopicPartition p : parts2) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p.partition()));
- }
- for (TopicPartition p : parts3) {
- // check that the element was actually contained
- assertTrue(allInitialPartitions.remove(p.partition()));
- }
-
- // all partitions must have been assigned
- assertTrue(allInitialPartitions.isEmpty());
-
- // grow the set of partitions and distribute anew
-
- List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
- newPartitions, "test-topic", numConsumers, 0);
- List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
- newPartitions, "test-topic", numConsumers, 1);
- List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
- newPartitions, "test-topic", numConsumers, 2);
-
- // new partitions must include all old partitions
-
- assertTrue(parts1new.size() > parts1.size());
- assertTrue(parts2new.size() > parts2.size());
- assertTrue(parts3new.size() > parts3.size());
-
- assertTrue(parts1new.containsAll(parts1));
- assertTrue(parts2new.containsAll(parts2));
- assertTrue(parts3new.containsAll(parts3));
-
- assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
- assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
- assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
- for (TopicPartition p : parts1new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p.partition()));
- }
- for (TopicPartition p : parts2new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p.partition()));
- }
- for (TopicPartition p : parts3new) {
- // check that the element was actually contained
- assertTrue(allNewPartitions.remove(p.partition()));
- }
-
- // all partitions must have been assigned
- assertTrue(allNewPartitions.isEmpty());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static boolean contains(int[] array, int value) {
- for (int i : array) {
- if (i == value) {
- return true;
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
deleted file mode 100644
index e35fcfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
- @Test
- public void testValidateZooKeeperConfig() {
- try {
- // empty
- Properties emptyProperties = new Properties();
- try {
- FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- // no connect string (only group string)
- Properties noConnect = new Properties();
- noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
- try {
- FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- // no group string (only connect string)
- Properties noGroup = new Properties();
- noGroup.put("zookeeper.connect", "localhost:47574");
- try {
- FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSnapshot() {
- try {
- Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
- Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
- Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-
- offsetsField.setAccessible(true);
- runningField.setAccessible(true);
- mapField.setAccessible(true);
-
- FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
- when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-
- long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
- LinkedMap map = new LinkedMap();
-
- offsetsField.set(consumer, testOffsets);
- runningField.set(consumer, true);
- mapField.set(consumer, map);
-
- assertTrue(map.isEmpty());
-
- // make multiple checkpoints
- for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
- long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
- assertArrayEquals(testOffsets, checkpoint);
-
- // change the offsets, make sure the snapshot did not change
- long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
-
- for (int i = 0; i < testOffsets.length; i++) {
- testOffsets[i] += 1L;
- }
-
- assertArrayEquals(checkpointCopy, checkpoint);
-
- assertTrue(map.size() > 0);
- assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- @Ignore("Kafka consumer internally makes an infinite loop")
- public void testCreateSourceWithoutCluster() {
- try {
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", "localhost:56794");
- props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
- props.setProperty("group.id", "non-existent-group");
-
- new FlinkKafkaConsumer<>("no op topic", new JavaDefaultStringSchema(), props,
- FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
- FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
deleted file mode 100644
index e9a5728..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
-import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.testutils.junit.RetryOnException;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.Collector;
-
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Assert;
-
-import org.junit.Rule;
-import scala.collection.Seq;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
- @Rule
- public RetryRule retryRule = new RetryRule();
-
- // ------------------------------------------------------------------------
- // Required methods by the abstract test base
- // ------------------------------------------------------------------------
-
- protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
- String topic, DeserializationSchema<T> deserializationSchema, Properties props);
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- //
- // The tests here are all not activated (by an @Test tag), but need
- // to be invoked from the extending classes. That way, the classes can
- // select which tests to run.
- // ------------------------------------------------------------------------
-
- /**
- * Test that validates that checkpointing and checkpoint notification works properly
- */
- public void runCheckpointingTest() throws Exception {
- createTestTopic("testCheckpointing", 1, 1);
-
- FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
- Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
- pendingCheckpointsField.setAccessible(true);
- LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
- Assert.assertEquals(0, pendingCheckpoints.size());
- source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
- final long[] initialOffsets = new long[] { 1337 };
-
- // first restore
- source.restoreState(initialOffsets);
-
- // then open
- source.open(new Configuration());
- long[] state1 = source.snapshotState(1, 15);
-
- assertArrayEquals(initialOffsets, state1);
-
- long[] state2 = source.snapshotState(2, 30);
- Assert.assertArrayEquals(initialOffsets, state2);
- Assert.assertEquals(2, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(1);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(2);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.notifyCheckpointComplete(666); // invalid checkpoint
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- source.snapshotState(i, 15 * i);
- }
- Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
- // commit only the second last
- source.notifyCheckpointComplete(598);
- Assert.assertEquals(1, pendingCheckpoints.size());
-
- // access invalid checkpoint
- source.notifyCheckpointComplete(590);
-
- // and the last
- source.notifyCheckpointComplete(599);
- Assert.assertEquals(0, pendingCheckpoints.size());
-
- source.close();
-
- deleteTestTopic("testCheckpointing");
- }
-
- /**
- * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
- *
- * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
- */
- public void runOffsetInZookeeperValidationTest() throws Exception {
- LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
- final String topicName = "testOffsetHacking";
- final int parallelism = 3;
-
- createTestTopic(topicName, parallelism, 1);
-
- StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env1.getConfig().disableSysoutLogging();
- env1.enableCheckpointing(50);
- env1.setNumberOfExecutionRetries(0);
- env1.setParallelism(parallelism);
-
- StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env2.getConfig().disableSysoutLogging();
- env2.enableCheckpointing(50);
- env2.setNumberOfExecutionRetries(0);
- env2.setParallelism(parallelism);
-
- StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env3.getConfig().disableSysoutLogging();
- env3.enableCheckpointing(50);
- env3.setNumberOfExecutionRetries(0);
- env3.setParallelism(parallelism);
-
- // write a sequence from 0 to 99 to each of the 3 partitions.
- writeSequence(env1, topicName, 100, parallelism);
-
- readSequence(env2, standardProps, parallelism, topicName, 100, 0);
-
- ZkClient zkClient = createZookeeperClient();
-
- long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
- long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
- long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
-
- LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
- assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-
- LOG.info("Manipulating offsets");
-
- // set the offset to 50 for the three partitions
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
-
- zkClient.close();
-
- // create new env
- readSequence(env3, standardProps, parallelism, topicName, 50, 50);
-
- deleteTestTopic(topicName);
-
- LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
- }
-
- /**
- * Ensure Kafka is working on both producer and consumer side.
- * This executes a job that contains two Flink pipelines.
- *
- * <pre>
- * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
- * </pre>
- *
- * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
- * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
- * cause the test to fail.
- */
- @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
- public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
- LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
- final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
- final int parallelism = 3;
- final int elementsPerPartition = 100;
- final int totalElements = parallelism * elementsPerPartition;
-
- createTestTopic(topic, parallelism, 2);
-
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.getConfig().disableSysoutLogging();
-
- TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
- // ----------- add producer dataflow ----------
-
- DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) {
- int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
- int limit = cnt + elementsPerPartition;
-
-
- while (running && cnt < limit) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
- stream.addSink(new FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
-
- // ----------- add consumer dataflow ----------
-
- FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
-
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
- consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
- private int elCnt = 0;
- private BitSet validator = new BitSet(totalElements);
-
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
-
- assertEquals(value.f0 - 1000, (long) v);
-
- assertFalse("Received tuple twice", validator.get(v));
- validator.set(v);
- elCnt++;
-
- if (elCnt == totalElements) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != totalElements) {
- fail("The bitset was not set to 1 on all elements. Next clear:"
- + nc + " Set: " + validator);
- }
- throw new SuccessException();
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- }).setParallelism(1);
-
- try {
- tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
- }
- catch (ProgramInvocationException | JobExecutionException e) {
- // look for NotLeaderForPartitionException
- Throwable cause = e.getCause();
-
- // search for nested SuccessExceptions
- int depth = 0;
- while (cause != null && depth++ < 20) {
- if (cause instanceof kafka.common.NotLeaderForPartitionException) {
- throw (Exception) cause;
- }
- cause = cause.getCause();
- }
- throw e;
- }
-
- LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
- deleteTestTopic(topic);
- }
-
- /**
- * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
- * Flink sources.
- */
- public void runOneToOneExactlyOnceTest() throws Exception {
- LOG.info("Starting runOneToOneExactlyOnceTest()");
-
- final String topic = "oneToOneTopic";
- final int parallelism = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- createTestTopic(topic, parallelism, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-to-one exactly once test");
-
- deleteTestTopic(topic);
- }
-
- /**
- * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
- * one Flink source will read multiple Kafka partitions.
- */
- public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
- LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
- final String topic = "oneToManyTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- final int parallelism = 2;
-
- createTestTopic(topic, numPartitions, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 3))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-source-multi-partitions exactly once test");
-
- deleteTestTopic(topic);
- }
-
- /**
- * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
- * that some Flink sources will read no partitions.
- */
- public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
- LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
- final String topic = "manyToOneTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- final int parallelism = 8;
-
- createTestTopic(topic, numPartitions, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
- env.setBufferTimeout(0);
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "multi-source-one-partitions exactly once test");
-
-
- deleteTestTopic(topic);
- }
-
-
- /**
- * Tests that the source can be properly canceled when reading full partitions.
- */
- public void runCancelingOnFullInputTest() throws Exception {
- final String topic = "cancelingOnFullTopic";
-
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
-
- // launch a producer thread
- DataGenerators.InfiniteStringsGenerator generator =
- new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
- generator.start();
-
- // launch a consumer asynchronously
-
- final AtomicReference<Throwable> jobError = new AtomicReference<>();
-
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute();
- }
- catch (Throwable t) {
- jobError.set(t);
- }
- }
- };
-
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
-
- // wait a bit before canceling
- Thread.sleep(2000);
-
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
-
- Throwable failueCause = jobError.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
- if (generator.isAlive()) {
- generator.shutdown();
- generator.join();
- }
- else {
- Throwable t = generator.getError();
- if (t != null) {
- t.printStackTrace();
- fail("Generator failed: " + t.getMessage());
- } else {
- fail("Generator failed with no exception");
- }
- }
-
- deleteTestTopic(topic);
- }
-
- /**
- * Tests that the source can be properly canceled when reading empty partitions.
- */
- public void runCancelingOnEmptyInputTest() throws Exception {
- final String topic = "cancelingOnEmptyInputTopic";
-
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
- env.addSource(source).addSink(new DiscardingSink<String>());
-
- env.execute();
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
-
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
-
- // wait a bit before canceling
- Thread.sleep(2000);
-
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
-
- Throwable failueCause = error.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
- deleteTestTopic(topic);
- }
-
- /**
- * Tests that the source can be properly canceled when reading full partitions.
- */
- public void runFailOnDeployTest() throws Exception {
- final String topic = "failOnDeployTopic";
-
- createTestTopic(topic, 2, 1);
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(12); // needs to be more that the mini cluster has slots
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .addSink(new DiscardingSink<Integer>());
-
- try {
- env.execute();
- fail("this test should fail with an exception");
- }
- catch (ProgramInvocationException e) {
-
- // validate that we failed due to a NoResourceAvailableException
- Throwable cause = e.getCause();
- int depth = 0;
- boolean foundResourceException = false;
-
- while (cause != null && depth++ < 20) {
- if (cause instanceof NoResourceAvailableException) {
- foundResourceException = true;
- break;
- }
- cause = cause.getCause();
- }
-
- assertTrue("Wrong exception", foundResourceException);
- }
-
- deleteTestTopic(topic);
- }
-
- public void runInvalidOffsetTest() throws Exception {
- final String topic = "invalidOffsetTopic";
- final int parallelism = 1;
-
- // create topic
- createTestTopic(topic, parallelism, 1);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
- // write 20 messages into topic:
- writeSequence(env, topic, 20, parallelism);
-
- // set invalid offset:
- ZkClient zkClient = createZookeeperClient();
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
-
- // read from topic
- final int valuesCount = 20;
- final int startFrom = 0;
- readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
-
- deleteTestTopic(topic);
- }
-
- /**
- * Test Flink's Kafka integration also with very big records (30MB)
- * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
- */
- public void runBigRecordTestTopology() throws Exception {
- LOG.info("Starting runBigRecordTestTopology()");
-
- final String topic = "bigRecordTestTopic";
- final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-
- createTestTopic(topic, parallelism, 1);
-
- final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
- env.enableCheckpointing(100);
- env.setParallelism(parallelism);
-
- // add consuming topology:
- Properties consumerProps = new Properties();
- consumerProps.putAll(standardProps);
- consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
- consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
- consumerProps.setProperty("queued.max.message.chunks", "1");
-
- FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
- DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
- consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
- private int elCnt = 0;
-
- @Override
- public void invoke(Tuple2<Long, byte[]> value) throws Exception {
- elCnt++;
- if (value.f0 == -1) {
- // we should have seen 11 elements now.
- if(elCnt == 11) {
- throw new SuccessException();
- } else {
- throw new RuntimeException("There have been "+elCnt+" elements");
- }
- }
- if(elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
- }
- }
- });
-
- // add producing topology
- Properties producerProps = new Properties();
- producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
- producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
-
- DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
- private boolean running;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- running = true;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
- Random rnd = new Random();
- long cnt = 0;
- int fifteenMb = 1024 * 1024 * 15;
-
- while (running) {
- byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
- ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
-
- Thread.sleep(100);
-
- if (cnt == 10) {
- // signal end
- ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
- break;
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, producerProps));
-
- tryExecute(env, "big topology test");
-
- deleteTestTopic(topic);
-
- LOG.info("Finished runBigRecordTestTopology()");
- }
-
-
- public void runBrokerFailureTest() throws Exception {
- LOG.info("starting runBrokerFailureTest()");
-
- final String topic = "brokerFailureTestTopic";
-
- final int parallelism = 2;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
-
- createTestTopic(topic, parallelism, 2);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // find leader to shut down
- ZkClient zkClient = createZookeeperClient();
- PartitionMetadata firstPart = null;
- do {
- if (firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
-
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
- }
- while (firstPart.errorCode() != 0);
- zkClient.close();
-
- final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
- final String leaderToShutDownConnection =
- NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
-
-
- final int leaderIdToShutDown = firstPart.leader().get().id();
- LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(500);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- BrokerKillingMapper.killedLeaderBefore = false;
- tryExecute(env, "One-to-one exactly once test");
-
- // start a new broker:
- brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
-
- LOG.info("finished runBrokerFailureTest()");
- }
-
- // ------------------------------------------------------------------------
- // Reading writing test data sets
- // ------------------------------------------------------------------------
-
- private void readSequence(StreamExecutionEnvironment env, Properties cc,
- final int sourceParallelism,
- final String topicName,
- final int valuesCount, final int startFrom) throws Exception {
-
- final int finalCount = valuesCount * sourceParallelism;
-
- final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
- new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
- // create the consumer
- FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
-
- DataStream<Tuple2<Integer, Integer>> source = env
- .addSource(consumer).setParallelism(sourceParallelism)
- .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
- // verify data
- source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
- private int[] values = new int[valuesCount];
- private int count = 0;
-
- @Override
- public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
- values[value.f1 - startFrom]++;
- count++;
-
- // verify if we've seen everything
- if (count == finalCount) {
- for (int i = 0; i < values.length; i++) {
- int v = values[i];
- if (v != sourceParallelism) {
- printTopic(topicName, valuesCount, deser);
- throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
- }
- }
- // test has passed
- throw new SuccessException();
- }
- }
-
- }).setParallelism(1);
-
- tryExecute(env, "Read data from Kafka");
-
- LOG.info("Successfully read sequence for verification");
- }
-
- private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
-
- TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
- DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- int cnt = 0;
- int partition = getRuntimeContext().getIndexOfThisSubtask();
-
- while (running && cnt < numElements) {
- ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }).setParallelism(parallelism);
-
- stream.addSink(new FlinkKafkaProducer<>(topicName,
- new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
- FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
- new Tuple2Partitioner(parallelism)
- )).setParallelism(parallelism);
-
- env.execute("Write sequence");
-
- LOG.info("Finished writing sequence");
- }
-
- // ------------------------------------------------------------------------
- // Debugging utilities
- // ------------------------------------------------------------------------
-
- /**
- * Read topic to list, only using Kafka code.
- */
- private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
- // we request only one stream per consumer instance. Kafka will make sure that each consumer group
- // will see each message only once.
- Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
- if(streams.size() != 1) {
- throw new RuntimeException("Expected only one message stream but got "+streams.size());
- }
- List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
- if(kafkaStreams == null) {
- throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
- }
- if(kafkaStreams.size() != 1) {
- throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
- }
- LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
- ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
- List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
- int read = 0;
- while(iteratorToRead.hasNext()) {
- read++;
- result.add(iteratorToRead.next());
- if(read == stopAfter) {
- LOG.info("Read "+read+" elements");
- return result;
- }
- }
- return result;
- }
-
- private static void printTopic(String topicName, ConsumerConfig config,
- DeserializationSchema<?> deserializationSchema,
- int stopAfter) {
-
- List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
- LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
- for (MessageAndMetadata<byte[], byte[]> message: contents) {
- Object out = deserializationSchema.deserialize(message.message());
- LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
- }
- }
-
- private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
- // write the sequence to log for debugging purposes
- Properties stdProps = standardCC.props().props();
- Properties newProps = new Properties(stdProps);
- newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
- newProps.setProperty("auto.offset.reset", "smallest");
- newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
- ConsumerConfig printerConfig = new ConsumerConfig(newProps);
- printTopic(topicName, printerConfig, deserializer, elements);
- }
-
-
- public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
- implements Checkpointed<Integer>, CheckpointNotifier {
-
- private static final long serialVersionUID = 6334389850158707313L;
-
- public static volatile boolean killedLeaderBefore;
- public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
- private final String leaderToShutDown;
- private final int failCount;
- private int numElementsTotal;
-
- private boolean failer;
- private boolean hasBeenCheckpointed;
-
-
- public BrokerKillingMapper(String leaderToShutDown, int failCount) {
- this.leaderToShutDown = leaderToShutDown;
- this.failCount = failCount;
- }
-
- @Override
- public void open(Configuration parameters) {
- failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
- }
-
- @Override
- public T map(T value) throws Exception {
- numElementsTotal++;
-
- if (!killedLeaderBefore) {
- Thread.sleep(10);
-
- if (failer && numElementsTotal >= failCount) {
- // shut down a Kafka broker
- KafkaServer toShutDown = null;
- for (KafkaServer kafkaServer : brokers) {
- String connectionUrl =
- NetUtils.hostAndPortToUrlString(
- kafkaServer.config().advertisedHostName(),
- kafkaServer.config().advertisedPort());
- if (leaderToShutDown.equals(connectionUrl)) {
- toShutDown = kafkaServer;
- break;
- }
- }
-
- if (toShutDown == null) {
- StringBuilder listOfBrokers = new StringBuilder();
- for (KafkaServer kafkaServer : brokers) {
- listOfBrokers.append(
- NetUtils.hostAndPortToUrlString(
- kafkaServer.config().advertisedHostName(),
- kafkaServer.config().advertisedPort()));
- listOfBrokers.append(" ; ");
- }
-
- throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
- + " ; available brokers: " + listOfBrokers.toString());
- }
- else {
- hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
- killedLeaderBefore = true;
- toShutDown.shutdown();
- }
- }
- }
- return value;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- hasBeenCheckpointed = true;
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
- }
-
- @Override
- public void restoreState(Integer state) {
- this.numElementsTotal = state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
deleted file mode 100644
index b4511ce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-
-public class KafkaITCase extends KafkaConsumerTestBase {
-
- @Override
- protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
- return new FlinkKafkaConsumer081<>(topic, deserializationSchema, props);
- }
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testCheckpointing() throws Exception {
- runCheckpointingTest();
- }
-
- @Test
- public void testOffsetInZookeeper() throws Exception {
- runOffsetInZookeeperValidationTest();
- }
-
- @Test
- public void testConcurrentProducerConsumerTopology() throws Exception {
- runSimpleConcurrentProducerConsumerTopology();
- }
-
- // --- canceling / failures ---
-
- @Test
- public void testCancelingEmptyTopic() throws Exception {
- runCancelingOnEmptyInputTest();
- }
-
- @Test
- public void testCancelingFullTopic() throws Exception {
- runCancelingOnFullInputTest();
- }
-
- @Test
- public void testFailOnDeploy() throws Exception {
- runFailOnDeployTest();
- }
-
- @Test
- public void testInvalidOffset() throws Exception {
- runInvalidOffsetTest();
- }
-
- // --- source to partition mappings and exactly once ---
-
- @Test
- public void testOneToOneSources() throws Exception {
- runOneToOneExactlyOnceTest();
- }
-
- @Test
- public void testOneSourceMultiplePartitions() throws Exception {
- runOneSourceMultiplePartitionsExactlyOnceTest();
- }
-
- @Test
- public void testMultipleSourcesOnePartition() throws Exception {
- runMultipleSourcesOnePartitionExactlyOnceTest();
- }
-
- // --- broker failure ---
-
- @Test
- public void testBrokerFailure() throws Exception {
- runBrokerFailureTest();
- }
-
- // --- special executions ---
-
- @Test
- public void testBigRecordJob() throws Exception {
- runBigRecordTestTopology();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- @Override
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- LOG.warn("Interruption", e);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
deleted file mode 100644
index 5001364..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.io.Serializable;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
- /**
- *
- * <pre>
- * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
- * / | \
- * / | \
- * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
- * \ | /
- * \ | /
- * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
- * </pre>
- *
- * The mapper validates that the values come consistently from the correct Kafka partition.
- *
- * The final sink validates that there are no duplicates and that all partitions are present.
- */
- @Test
- public void testCustomPartitioning() {
- try {
- LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
- final String topic = "customPartitioningTestTopic";
- final int parallelism = 3;
-
- createTestTopic(topic, parallelism, 1);
-
- TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
- TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
- // ------ producing topology ---------
-
- // source has DOP 1 to make sure it generates no duplicates
- DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
- long cnt = 0;
- while (running) {
- ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- })
- .setParallelism(1);
-
- // sink partitions into
- stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
- .setParallelism(parallelism);
-
- // ------ consuming topology ---------
-
- FlinkKafkaConsumer<Tuple2<Long, String>> source =
- new FlinkKafkaConsumer<>(topic, deserSchema, standardProps,
- FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
- FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-
- env.addSource(source).setParallelism(parallelism)
-
- // mapper that validates partitioning and maps to partition
- .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-
- private int ourPartition = -1;
- @Override
- public Integer map(Tuple2<Long, String> value) {
- int partition = value.f0.intValue() % parallelism;
- if (ourPartition != -1) {
- assertEquals("inconsistent partitioning", ourPartition, partition);
- } else {
- ourPartition = partition;
- }
- return partition;
- }
- }).setParallelism(parallelism)
-
- .addSink(new SinkFunction<Integer>() {
-
- private int[] valuesPerPartition = new int[parallelism];
-
- @Override
- public void invoke(Integer value) throws Exception {
- valuesPerPartition[value]++;
-
- boolean missing = false;
- for (int i : valuesPerPartition) {
- if (i < 100) {
- missing = true;
- break;
- }
- }
- if (!missing) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
-
- tryExecute(env, "custom partitioning test");
-
- deleteTestTopic(topic);
-
- LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- // ------------------------------------------------------------------------
-
- public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
-
- private final int expectedPartitions;
-
- public CustomPartitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Object key, int numPartitions) {
- @SuppressWarnings("unchecked")
- Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-
- assertEquals(expectedPartitions, numPartitions);
-
- return (int) (tuple.f0 % numPartitions);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index c5c3387..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducer.class)
-public class KafkaProducerTest extends TestLogger {
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPropagateExceptions() {
- try {
- // mock kafka producer
- KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-
- // partition setup
- when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
- Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
- // failure when trying to send an element
- when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
- .thenAnswer(new Answer<Future<RecordMetadata>>() {
- @Override
- public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
- Callback callback = (Callback) invocation.getArguments()[1];
- callback.onCompletion(null, new Exception("Test error"));
- return null;
- }
- });
-
- // make sure the FlinkKafkaProducer instantiates our mock producer
- whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-
- // (1) producer that propagates errors
-
- FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
- "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
-
- producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
- producerPropagating.open(new Configuration());
-
- try {
- producerPropagating.invoke("value");
- producerPropagating.invoke("value");
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertNotNull(e.getCause());
- assertNotNull(e.getCause().getMessage());
- assertTrue(e.getCause().getMessage().contains("Test error"));
- }
-
- // (2) producer that only logs errors
-
- FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
- "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
- producerLogging.setLogFailuresOnly(true);
-
- producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
- producerLogging.open(new Configuration());
-
- producerLogging.invoke("value");
- producerLogging.invoke("value");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}