You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:25 UTC
[09/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
new file mode 100644
index 0000000..e9a5728
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -0,0 +1,1124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
+import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
+import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.Assert;
+
+import org.junit.Rule;
+import scala.collection.Seq;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public abstract class KafkaConsumerTestBase extends KafkaTestBase {
+
+ @Rule
+ public RetryRule retryRule = new RetryRule();
+
+ // ------------------------------------------------------------------------
+ // Required methods by the abstract test base
+ // ------------------------------------------------------------------------
+
+ protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
+ String topic, DeserializationSchema<T> deserializationSchema, Properties props);
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ //
+ // The tests here are all not activated (by an @Test tag), but need
+ // to be invoked from the extending classes. That way, the classes can
+ // select which tests to run.
+ // ------------------------------------------------------------------------
+
+ /**
+ * Test that validates that checkpointing and checkpoint notification works properly
+ */
+ public void runCheckpointingTest() throws Exception {
+ createTestTopic("testCheckpointing", 1, 1);
+
+ FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
+ Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+ pendingCheckpointsField.setAccessible(true);
+ LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+
+ Assert.assertEquals(0, pendingCheckpoints.size());
+ source.setRuntimeContext(new MockRuntimeContext(1, 0));
+
+ final long[] initialOffsets = new long[] { 1337 };
+
+ // first restore
+ source.restoreState(initialOffsets);
+
+ // then open
+ source.open(new Configuration());
+ long[] state1 = source.snapshotState(1, 15);
+
+ assertArrayEquals(initialOffsets, state1);
+
+ long[] state2 = source.snapshotState(2, 30);
+ Assert.assertArrayEquals(initialOffsets, state2);
+ Assert.assertEquals(2, pendingCheckpoints.size());
+
+ source.notifyCheckpointComplete(1);
+ Assert.assertEquals(1, pendingCheckpoints.size());
+
+ source.notifyCheckpointComplete(2);
+ Assert.assertEquals(0, pendingCheckpoints.size());
+
+ source.notifyCheckpointComplete(666); // invalid checkpoint
+ Assert.assertEquals(0, pendingCheckpoints.size());
+
+ // create 500 snapshots
+ for (int i = 100; i < 600; i++) {
+ source.snapshotState(i, 15 * i);
+ }
+ Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+
+ // commit only the second last
+ source.notifyCheckpointComplete(598);
+ Assert.assertEquals(1, pendingCheckpoints.size());
+
+ // access invalid checkpoint
+ source.notifyCheckpointComplete(590);
+
+ // and the last
+ source.notifyCheckpointComplete(599);
+ Assert.assertEquals(0, pendingCheckpoints.size());
+
+ source.close();
+
+ deleteTestTopic("testCheckpointing");
+ }
+
+ /**
+ * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
+ *
+ * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
+ */
+ public void runOffsetInZookeeperValidationTest() throws Exception {
+ LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
+
+ final String topicName = "testOffsetHacking";
+ final int parallelism = 3;
+
+ createTestTopic(topicName, parallelism, 1);
+
+ StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.getConfig().disableSysoutLogging();
+ env1.enableCheckpointing(50);
+ env1.setNumberOfExecutionRetries(0);
+ env1.setParallelism(parallelism);
+
+ StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env2.getConfig().disableSysoutLogging();
+ env2.enableCheckpointing(50);
+ env2.setNumberOfExecutionRetries(0);
+ env2.setParallelism(parallelism);
+
+ StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env3.getConfig().disableSysoutLogging();
+ env3.enableCheckpointing(50);
+ env3.setNumberOfExecutionRetries(0);
+ env3.setParallelism(parallelism);
+
+ // write a sequence from 0 to 99 to each of the 3 partitions.
+ writeSequence(env1, topicName, 100, parallelism);
+
+ readSequence(env2, standardProps, parallelism, topicName, 100, 0);
+
+ ZkClient zkClient = createZookeeperClient();
+
+ long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+ long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+ long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
+
+ LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+ assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+
+ LOG.info("Manipulating offsets");
+
+ // set the offset to 50 for the three partitions
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+
+ zkClient.close();
+
+ // create new env
+ readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+
+ deleteTestTopic(topicName);
+
+ LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
+ }
+
+ /**
+ * Ensure Kafka is working on both producer and consumer side.
+ * This executes a job that contains two Flink pipelines.
+ *
+ * <pre>
+ * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
+ * </pre>
+ *
+ * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+ * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
+ * cause the test to fail.
+ */
+ @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
+ public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
+ LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
+
+ final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
+ final int parallelism = 3;
+ final int elementsPerPartition = 100;
+ final int totalElements = parallelism * elementsPerPartition;
+
+ createTestTopic(topic, parallelism, 2);
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.getConfig().disableSysoutLogging();
+
+ TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+ // ----------- add producer dataflow ----------
+
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, String>> ctx) {
+ int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+ int limit = cnt + elementsPerPartition;
+
+
+ while (running && cnt < limit) {
+ ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+ stream.addSink(new FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
+
+ // ----------- add consumer dataflow ----------
+
+ FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
+
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+
+ private int elCnt = 0;
+ private BitSet validator = new BitSet(totalElements);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ assertFalse("Received tuple twice", validator.get(v));
+ validator.set(v);
+ elCnt++;
+
+ if (elCnt == totalElements) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != totalElements) {
+ fail("The bitset was not set to 1 on all elements. Next clear:"
+ + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+ }).setParallelism(1);
+
+ try {
+ tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
+ }
+ catch (ProgramInvocationException | JobExecutionException e) {
+ // look for NotLeaderForPartitionException
+ Throwable cause = e.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof kafka.common.NotLeaderForPartitionException) {
+ throw (Exception) cause;
+ }
+ cause = cause.getCause();
+ }
+ throw e;
+ }
+
+ LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
+ * Flink sources.
+ */
+ public void runOneToOneExactlyOnceTest() throws Exception {
+ LOG.info("Starting runOneToOneExactlyOnceTest()");
+
+ final String topic = "oneToOneTopic";
+ final int parallelism = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ createTestTopic(topic, parallelism, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-to-one exactly once test");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
+ * one Flink source will read multiple Kafka partitions.
+ */
+ public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
+ LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
+
+ final String topic = "oneToManyTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ final int parallelism = 2;
+
+ createTestTopic(topic, numPartitions, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, numPartitions, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 3))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-source-multi-partitions exactly once test");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
+ * that some Flink sources will read no partitions.
+ */
+ public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
+ LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
+
+ final String topic = "manyToOneTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ final int parallelism = 8;
+
+ createTestTopic(topic, numPartitions, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, numPartitions, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+ env.setBufferTimeout(0);
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "multi-source-one-partitions exactly once test");
+
+
+ deleteTestTopic(topic);
+ }
+
+
+ /**
+ * Tests that the source can be properly canceled when reading full partitions.
+ */
+ public void runCancelingOnFullInputTest() throws Exception {
+ final String topic = "cancelingOnFullTopic";
+
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
+
+ // launch a producer thread
+ DataGenerators.InfiniteStringsGenerator generator =
+ new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
+ generator.start();
+
+ // launch a consumer asynchronously
+
+ final AtomicReference<Throwable> jobError = new AtomicReference<>();
+
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ env.execute();
+ }
+ catch (Throwable t) {
+ jobError.set(t);
+ }
+ }
+ };
+
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
+
+ // wait a bit before canceling
+ Thread.sleep(2000);
+
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
+
+ Throwable failueCause = jobError.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+ if (generator.isAlive()) {
+ generator.shutdown();
+ generator.join();
+ }
+ else {
+ Throwable t = generator.getError();
+ if (t != null) {
+ t.printStackTrace();
+ fail("Generator failed: " + t.getMessage());
+ } else {
+ fail("Generator failed with no exception");
+ }
+ }
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests that the source can be properly canceled when reading empty partitions.
+ */
+ public void runCancelingOnEmptyInputTest() throws Exception {
+ final String topic = "cancelingOnEmptyInputTopic";
+
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ env.execute();
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
+
+ // wait a bit before canceling
+ Thread.sleep(2000);
+
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
+
+ Throwable failueCause = error.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests that the source can be properly canceled when reading full partitions.
+ */
+ public void runFailOnDeployTest() throws Exception {
+ final String topic = "failOnDeployTopic";
+
+ createTestTopic(topic, 2, 1);
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(12); // needs to be more that the mini cluster has slots
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .addSink(new DiscardingSink<Integer>());
+
+ try {
+ env.execute();
+ fail("this test should fail with an exception");
+ }
+ catch (ProgramInvocationException e) {
+
+ // validate that we failed due to a NoResourceAvailableException
+ Throwable cause = e.getCause();
+ int depth = 0;
+ boolean foundResourceException = false;
+
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof NoResourceAvailableException) {
+ foundResourceException = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ assertTrue("Wrong exception", foundResourceException);
+ }
+
+ deleteTestTopic(topic);
+ }
+
+ public void runInvalidOffsetTest() throws Exception {
+ final String topic = "invalidOffsetTopic";
+ final int parallelism = 1;
+
+ // create topic
+ createTestTopic(topic, parallelism, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+ // write 20 messages into topic:
+ writeSequence(env, topic, 20, parallelism);
+
+ // set invalid offset:
+ ZkClient zkClient = createZookeeperClient();
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
+
+ // read from topic
+ final int valuesCount = 20;
+ final int startFrom = 0;
+ readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Test Flink's Kafka integration also with very big records (30MB)
+ * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ */
+ public void runBigRecordTestTopology() throws Exception {
+ LOG.info("Starting runBigRecordTestTopology()");
+
+ final String topic = "bigRecordTestTopic";
+ final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
+
+ createTestTopic(topic, parallelism, 1);
+
+ final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+
+ final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+ new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+
+ final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
+ new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setNumberOfExecutionRetries(0);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(100);
+ env.setParallelism(parallelism);
+
+ // add consuming topology:
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
+ consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
+ consumerProps.setProperty("queued.max.message.chunks", "1");
+
+ FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+
+ private int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ elCnt++;
+ if (value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if(elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
+ }
+ }
+ if(elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ });
+
+ // add producing topology
+ Properties producerProps = new Properties();
+ producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
+ producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
+
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+
+ private boolean running;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+ Random rnd = new Random();
+ long cnt = 0;
+ int fifteenMb = 1024 * 1024 * 15;
+
+ while (running) {
+ byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
+ ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+
+ Thread.sleep(100);
+
+ if (cnt == 10) {
+ // signal end
+ ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, producerProps));
+
+ tryExecute(env, "big topology test");
+
+ deleteTestTopic(topic);
+
+ LOG.info("Finished runBigRecordTestTopology()");
+ }
+
+
+ public void runBrokerFailureTest() throws Exception {
+ LOG.info("starting runBrokerFailureTest()");
+
+ final String topic = "brokerFailureTestTopic";
+
+ final int parallelism = 2;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+
+ createTestTopic(topic, parallelism, 2);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // find leader to shut down
+ ZkClient zkClient = createZookeeperClient();
+ PartitionMetadata firstPart = null;
+ do {
+ if (firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ }
+ while (firstPart.errorCode() != 0);
+ zkClient.close();
+
+ final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
+ final String leaderToShutDownConnection =
+ NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
+
+
+ final int leaderIdToShutDown = firstPart.leader().get().id();
+ LOG.info("Leader to shutdown {}", leaderToShutDown);
+
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(500);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ BrokerKillingMapper.killedLeaderBefore = false;
+ tryExecute(env, "One-to-one exactly once test");
+
+ // start a new broker:
+ brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
+
+ LOG.info("finished runBrokerFailureTest()");
+ }
+
+ // ------------------------------------------------------------------------
+ // Reading writing test data sets
+ // ------------------------------------------------------------------------
+
+ private void readSequence(StreamExecutionEnvironment env, Properties cc,
+ final int sourceParallelism,
+ final String topicName,
+ final int valuesCount, final int startFrom) throws Exception {
+
+ final int finalCount = valuesCount * sourceParallelism;
+
+ final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+ final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
+ new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+
+ // create the consumer
+ FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
+
+ DataStream<Tuple2<Integer, Integer>> source = env
+ .addSource(consumer).setParallelism(sourceParallelism)
+ .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
+
+ // verify data
+ source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+
+ private int[] values = new int[valuesCount];
+ private int count = 0;
+
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+ values[value.f1 - startFrom]++;
+ count++;
+
+ // verify if we've seen everything
+ if (count == finalCount) {
+ for (int i = 0; i < values.length; i++) {
+ int v = values[i];
+ if (v != sourceParallelism) {
+ printTopic(topicName, valuesCount, deser);
+ throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+ }
+ }
+ // test has passed
+ throw new SuccessException();
+ }
+ }
+
+ }).setParallelism(1);
+
+ tryExecute(env, "Read data from Kafka");
+
+ LOG.info("Successfully read sequence for verification");
+ }
+
+ private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
+
+ TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+ DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int cnt = 0;
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ while (running && cnt < numElements) {
+ ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }).setParallelism(parallelism);
+
+ stream.addSink(new FlinkKafkaProducer<>(topicName,
+ new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
+ new Tuple2Partitioner(parallelism)
+ )).setParallelism(parallelism);
+
+ env.execute("Write sequence");
+
+ LOG.info("Finished writing sequence");
+ }
+
+ // ------------------------------------------------------------------------
+ // Debugging utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Read topic to list, only using Kafka code.
+ */
+ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+ ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+ if(streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if(kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if(kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+ ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+ List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+ int read = 0;
+ while(iteratorToRead.hasNext()) {
+ read++;
+ result.add(iteratorToRead.next());
+ if(read == stopAfter) {
+ LOG.info("Read "+read+" elements");
+ return result;
+ }
+ }
+ return result;
+ }
+
+ private static void printTopic(String topicName, ConsumerConfig config,
+ DeserializationSchema<?> deserializationSchema,
+ int stopAfter) {
+
+ List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+ LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+
+ for (MessageAndMetadata<byte[], byte[]> message: contents) {
+ Object out = deserializationSchema.deserialize(message.message());
+ LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+ }
+ }
+
+ private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
+ // write the sequence to log for debugging purposes
+ Properties stdProps = standardCC.props().props();
+ Properties newProps = new Properties(stdProps);
+ newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+ newProps.setProperty("auto.offset.reset", "smallest");
+ newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+ ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+ printTopic(topicName, printerConfig, deserializer, elements);
+ }
+
+
+ public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+ implements Checkpointed<Integer>, CheckpointNotifier {
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean killedLeaderBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+ private final String leaderToShutDown;
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+
+ public BrokerKillingMapper(String leaderToShutDown, int failCount) {
+ this.leaderToShutDown = leaderToShutDown;
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!killedLeaderBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ // shut down a Kafka broker
+ KafkaServer toShutDown = null;
+ for (KafkaServer kafkaServer : brokers) {
+ String connectionUrl =
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort());
+ if (leaderToShutDown.equals(connectionUrl)) {
+ toShutDown = kafkaServer;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer kafkaServer : brokers) {
+ listOfBrokers.append(
+ NetUtils.hostAndPortToUrlString(
+ kafkaServer.config().advertisedHostName(),
+ kafkaServer.config().advertisedPort()));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
+ + " ; available brokers: " + listOfBrokers.toString());
+ }
+ else {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ killedLeaderBefore = true;
+ toShutDown.shutdown();
+ }
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsTotal;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ this.numElementsTotal = state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
new file mode 100644
index 0000000..b4511ce
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+
+public class KafkaITCase extends KafkaConsumerTestBase {
+
+ @Override
+ protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+ return new FlinkKafkaConsumer081<>(topic, deserializationSchema, props);
+ }
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCheckpointing() throws Exception {
+ runCheckpointingTest();
+ }
+
+ @Test
+ public void testOffsetInZookeeper() throws Exception {
+ runOffsetInZookeeperValidationTest();
+ }
+
+ @Test
+ public void testConcurrentProducerConsumerTopology() throws Exception {
+ runSimpleConcurrentProducerConsumerTopology();
+ }
+
+ // --- canceling / failures ---
+
+ @Test
+ public void testCancelingEmptyTopic() throws Exception {
+ runCancelingOnEmptyInputTest();
+ }
+
+ @Test
+ public void testCancelingFullTopic() throws Exception {
+ runCancelingOnFullInputTest();
+ }
+
+ @Test
+ public void testFailOnDeploy() throws Exception {
+ runFailOnDeployTest();
+ }
+
+ @Test
+ public void testInvalidOffset() throws Exception {
+ runInvalidOffsetTest();
+ }
+
+ // --- source to partition mappings and exactly once ---
+
+ @Test
+ public void testOneToOneSources() throws Exception {
+ runOneToOneExactlyOnceTest();
+ }
+
+ @Test
+ public void testOneSourceMultiplePartitions() throws Exception {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test
+ public void testMultipleSourcesOnePartition() throws Exception {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test
+ public void testBrokerFailure() throws Exception {
+ runBrokerFailureTest();
+ }
+
+ // --- special executions ---
+
+ @Test
+ public void testBigRecordJob() throws Exception {
+ runBigRecordTestTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..72d2772
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+ @Override
+ public long milliseconds() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ LOG.warn("Interruption", e);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
new file mode 100644
index 0000000..5001364
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class KafkaProducerITCase extends KafkaTestBase {
+
+
+ /**
+ *
+ * <pre>
+ * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
+ * / | \
+ * / | \
+ * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
+ * \ | /
+ * \ | /
+ * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+ * </pre>
+ *
+ * The mapper validates that the values come consistently from the correct Kafka partition.
+ *
+ * The final sink validates that there are no duplicates and that all partitions are present.
+ */
+ @Test
+ public void testCustomPartitioning() {
+ try {
+ LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
+
+ final String topic = "customPartitioningTestTopic";
+ final int parallelism = 3;
+
+ createTestTopic(topic, parallelism, 1);
+
+ TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setNumberOfExecutionRetries(0);
+ env.getConfig().disableSysoutLogging();
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+ // ------ producing topology ---------
+
+ // source has DOP 1 to make sure it generates no duplicates
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
+ long cnt = 0;
+ while (running) {
+ ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ })
+ .setParallelism(1);
+
+ // sink partitions into
+ stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
+ .setParallelism(parallelism);
+
+ // ------ consuming topology ---------
+
+ FlinkKafkaConsumer<Tuple2<Long, String>> source =
+ new FlinkKafkaConsumer<>(topic, deserSchema, standardProps,
+ FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
+ FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
+
+ env.addSource(source).setParallelism(parallelism)
+
+ // mapper that validates partitioning and maps to partition
+ .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
+
+ private int ourPartition = -1;
+ @Override
+ public Integer map(Tuple2<Long, String> value) {
+ int partition = value.f0.intValue() % parallelism;
+ if (ourPartition != -1) {
+ assertEquals("inconsistent partitioning", ourPartition, partition);
+ } else {
+ ourPartition = partition;
+ }
+ return partition;
+ }
+ }).setParallelism(parallelism)
+
+ .addSink(new SinkFunction<Integer>() {
+
+ private int[] valuesPerPartition = new int[parallelism];
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ valuesPerPartition[value]++;
+
+ boolean missing = false;
+ for (int i : valuesPerPartition) {
+ if (i < 100) {
+ missing = true;
+ break;
+ }
+ }
+ if (!missing) {
+ throw new SuccessException();
+ }
+ }
+ }).setParallelism(1);
+
+ tryExecute(env, "custom partitioning test");
+
+ deleteTestTopic(topic);
+
+ LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+
+ public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
+
+ private final int expectedPartitions;
+
+ public CustomPartitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+ @Override
+ public int partition(Object key, int numPartitions) {
+ @SuppressWarnings("unchecked")
+ Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+
+ assertEquals(expectedPartitions, numPartitions);
+
+ return (int) (tuple.f0 % numPartitions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..c5c3387
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducer.class)
+public class KafkaProducerTest extends TestLogger {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPropagateExceptions() {
+ try {
+ // mock kafka producer
+ KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+
+ // partition setup
+ when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+ Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+ // failure when trying to send an element
+ when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+ .thenAnswer(new Answer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+ Callback callback = (Callback) invocation.getArguments()[1];
+ callback.onCompletion(null, new Exception("Test error"));
+ return null;
+ }
+ });
+
+ // make sure the FlinkKafkaProducer instantiates our mock producer
+ whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+
+ // (1) producer that propagates errors
+
+ FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
+ "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+
+ producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
+ producerPropagating.open(new Configuration());
+
+ try {
+ producerPropagating.invoke("value");
+ producerPropagating.invoke("value");
+ fail("This should fail with an exception");
+ }
+ catch (Exception e) {
+ assertNotNull(e.getCause());
+ assertNotNull(e.getCause().getMessage());
+ assertTrue(e.getCause().getMessage().contains("Test error"));
+ }
+
+ // (2) producer that only logs errors
+
+ FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
+ "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+ producerLogging.setLogFailuresOnly(true);
+
+ producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
+ producerLogging.open(new Configuration());
+
+ producerLogging.invoke("value");
+ producerLogging.invoke("value");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..d511796
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.consumer.ConsumerConfig;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.common.PartitionInfo;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * The base for the Kafka tests. It brings up:
+ * <ul>
+ * <li>A ZooKeeper mini cluster</li>
+ * <li>Three Kafka Brokers (mini clusters)</li>
+ * <li>A Flink mini cluster</li>
+ * </ul>
+ *
+ * <p>Code in this test is based on the following GitHub repository:
+ * <a href="https://github.com/sakserv/hadoop-mini-clusters">
+ * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
+ * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaTestBase extends TestLogger {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
+
+ protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+ protected static String zookeeperConnectionString;
+
+ protected static File tmpZkDir;
+
+ protected static File tmpKafkaParent;
+
+ protected static TestingServer zookeeper;
+ protected static List<KafkaServer> brokers;
+ protected static String brokerConnectionStrings = "";
+
+ protected static ConsumerConfig standardCC;
+ protected static Properties standardProps;
+
+ protected static ForkableFlinkMiniCluster flink;
+
+ protected static int flinkPort;
+
+ protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ protected static List<File> tmpKafkaDirs;
+
+ protected static String kafkaHost = "localhost";
+
+ // ------------------------------------------------------------------------
+ // Setup and teardown of the mini clusters
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void prepare() throws IOException {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaITCase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ LOG.info("Starting KafkaITCase.prepare()");
+
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+ assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+ assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+ tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ File tmpDir = new File(tmpKafkaParent, "server-" + i);
+ assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+ tmpKafkaDirs.add(tmpDir);
+ }
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ LOG.info("Starting Zookeeper");
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
+
+ for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
+ SocketServer socketServer = brokers.get(i).socketServer();
+
+ String host = socketServer.host() == null ? "localhost" : socketServer.host();
+ brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Test setup failed: " + t.getMessage());
+ }
+
+ standardProps = new Properties();
+
+ standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
+ standardProps.setProperty("group.id", "flink-tests");
+ standardProps.setProperty("auto.commit.enable", "false");
+ standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
+ standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
+ standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+ standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+ Properties consumerConfigProps = new Properties();
+ consumerConfigProps.putAll(standardProps);
+ consumerConfigProps.setProperty("auto.offset.reset", "smallest");
+ standardCC = new ConsumerConfig(consumerConfigProps);
+
+ // start also a re-usable Flink mini cluster
+
+ Configuration flinkConfig = new Configuration();
+ flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+ flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
+
+ flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
+ flink.start();
+
+ flinkPort = flink.getLeaderRPCPort();
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Shut down KafkaITCase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ flinkPort = -1;
+ if (flink != null) {
+ flink.shutdown();
+ }
+
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ brokers.clear();
+
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ }
+ catch (Exception e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ zookeeper = null;
+ }
+
+ // clean up the temp spaces
+
+ if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpKafkaParent);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ if (tmpZkDir != null && tmpZkDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpZkDir);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" KafkaITCase finished");
+ LOG.info("-------------------------------------------------------------------------");
+ }
+
+ /**
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ */
+ protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
+ String kafkaHost,
+ String zookeeperConnectionString) throws Exception {
+ Properties kafkaProperties = new Properties();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", kafkaHost);
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+ // for CI stability, increase zookeeper session timeout
+ kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+
+ final int numTries = 5;
+
+ for (int i = 1; i <= numTries; i++) {
+ int kafkaPort = NetUtils.getAvailablePort();
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ try {
+ KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+ server.startup();
+ return server;
+ }
+ catch (KafkaException e) {
+ if (e.getCause() instanceof BindException) {
+ // port conflict, retry...
+ LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+ }
+ else {
+ throw e;
+ }
+ }
+ }
+
+ throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+ }
+
+ // ------------------------------------------------------------------------
+ // Execution utilities
+ // ------------------------------------------------------------------------
+
+ protected ZkClient createZookeeperClient() {
+ return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+ standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+ }
+
+ protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ root.printStackTrace();
+ fail("Test failed: " + root.getMessage());
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+ protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ throw root;
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+
+
+ protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+
+ // create topic with one client
+ Properties topicConfig = new Properties();
+ LOG.info("Creating topic {}", topic);
+
+ ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+ standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+
+ AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
+ creator.close();
+
+ // validate that the topic has been created
+ final long deadline = System.currentTimeMillis() + 30000;
+ do {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ // restore interrupted state
+ }
+ List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
+ if (partitions != null && partitions.size() > 0) {
+ return;
+ }
+ }
+ while (System.currentTimeMillis() < deadline);
+ fail ("Test topic could not be created");
+ }
+
+ protected static void deleteTestTopic(String topic) {
+ LOG.info("Deleting topic {}", topic);
+
+ ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+ standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+
+ AdminUtils.deleteTopic(zk, topic);
+
+ zk.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..75fdd46
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FixedPartitioner part = new FixedPartitioner();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc2", partitions.length));
+
+ part.open(2, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc3", partitions.length));
+ Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;)
+
+ part.open(3, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc4", partitions.length));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FixedPartitioner part = new FixedPartitioner();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 2, partitions);
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FixedPartitioner part = new FixedPartitioner();
+ int[] partitions = new int[]{0,1};
+
+ part.open(0, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 3, partitions);
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+
+ part.open(2, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
new file mode 100644
index 0000000..27ad2e8
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.admin.AdminUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
+
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
+
+ @Test
+ public void runOffsetManipulationinZooKeeperTest() {
+ try {
+ final String topicName = "ZookeeperOffsetHandlerTest-Topic";
+ final String groupId = "ZookeeperOffsetHandlerTest-Group";
+
+ final long offset = (long) (Math.random() * Long.MAX_VALUE);
+
+ ZkClient zkClient = createZookeeperClient();
+ AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
+
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
+
+ long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
+
+ zkClient.close();
+
+ assertEquals(offset, fetchedOffset);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
new file mode 100644
index 0000000..32377ae
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Random;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+
+ public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
+ String brokerConnection, String topic,
+ int numPartitions,
+ final int from, final int to) throws Exception {
+
+ TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+ env.setParallelism(numPartitions);
+ env.getConfig().disableSysoutLogging();
+ env.setNumberOfExecutionRetries(0);
+
+ DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
+ new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int cnt = from;
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ while (running && cnt <= to) {
+ ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream.addSink(new FlinkKafkaProducer<>(topic,
+ new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
+ new Tuple2Partitioner(numPartitions)
+ ));
+
+ env.execute("Data generator (Int, Int) stream to topic " + topic);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+ String brokerConnection, String topic,
+ final int numPartitions,
+ final int numElements,
+ final boolean randomizeOrder) throws Exception {
+ env.setParallelism(numPartitions);
+ env.getConfig().disableSysoutLogging();
+ env.setNumberOfExecutionRetries(0);
+
+ DataStream<Integer> stream = env.addSource(
+ new RichParallelSourceFunction<Integer>() {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) {
+ // create a sequence
+ int[] elements = new int[numElements];
+ for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
+ i < numElements;
+ i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
+ elements[i] = val;
+ }
+
+ // scramble the sequence
+ if (randomizeOrder) {
+ Random rnd = new Random();
+ for (int i = 0; i < elements.length; i++) {
+ int otherPos = rnd.nextInt(elements.length);
+
+ int tmp = elements[i];
+ elements[i] = elements[otherPos];
+ elements[otherPos] = tmp;
+ }
+ }
+
+ // emit the sequence
+ int pos = 0;
+ while (running && pos < elements.length) {
+ ctx.collect(elements[pos++]);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ stream
+ .rebalance()
+ .addSink(new FlinkKafkaProducer<>(topic,
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
+ new KafkaPartitioner() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return ((Integer) key) % numPartitions;
+ }
+ }));
+
+ env.execute("Scrambles int sequence generator");
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class InfiniteStringsGenerator extends Thread {
+
+ private final String kafkaConnectionString;
+
+ private final String topic;
+
+ private volatile Throwable error;
+
+ private volatile boolean running = true;
+
+
+ public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
+ this.kafkaConnectionString = kafkaConnectionString;
+ this.topic = topic;
+ }
+
+ @Override
+ public void run() {
+ // we manually feed data into the Kafka sink
+ FlinkKafkaProducer<String> producer = null;
+ try {
+ producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+ producer.setRuntimeContext(new MockRuntimeContext(1,0));
+ producer.open(new Configuration());
+
+ final StringBuilder bld = new StringBuilder();
+ final Random rnd = new Random();
+
+ while (running) {
+ bld.setLength(0);
+
+ int len = rnd.nextInt(100) + 1;
+ for (int i = 0; i < len; i++) {
+ bld.append((char) (rnd.nextInt(20) + 'a') );
+ }
+
+ String next = bld.toString();
+ producer.invoke(next);
+ }
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ finally {
+ if (producer != null) {
+ try {
+ producer.close();
+ }
+ catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+
+ public Throwable getError() {
+ return this.error;
+ }
+ }
+}