You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:20 UTC
[30/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
new file mode 100644
index 0000000..aa7ea49
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -0,0 +1,2006 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public abstract class KafkaConsumerTestBase extends KafkaTestBase {
+
+ @Rule
+ public RetryRule retryRule = new RetryRule();
+
+
+ // ------------------------------------------------------------------------
+ // Common Test Preparation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Makes sure that no job is on the JobManager any more from any previous tests that use
+ * the same mini cluster. Otherwise, missing slots may happen.
+ */
+ @Before
+ public void ensureNoJobIsLingering() throws Exception {
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ //
+ // The tests here are all not activated (by an @Test tag), but need
+ // to be invoked from the extending classes. That way, the classes can
+ // select which tests to run.
+ // ------------------------------------------------------------------------
+
+ /**
+ * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
+ * and a wrong broker was specified
+ *
+ * @throws Exception
+ */
+ public void runFailOnNoBrokerTest() throws Exception {
+ try {
+ Properties properties = new Properties();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ see.getConfig().disableSysoutLogging();
+ see.setRestartStrategy(RestartStrategies.noRestart());
+ see.setParallelism(1);
+
+ // use wrong ports for the consumers
+ properties.setProperty("bootstrap.servers", "localhost:80");
+ properties.setProperty("zookeeper.connect", "localhost:80");
+ properties.setProperty("group.id", "test");
+ properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast
+ properties.setProperty("socket.timeout.ms", "3000");
+ properties.setProperty("session.timeout.ms", "2000");
+ properties.setProperty("fetch.max.wait.ms", "2000");
+ properties.setProperty("heartbeat.interval.ms", "1000");
+ properties.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
+ DataStream<String> stream = see.addSource(source);
+ stream.print();
+ see.execute("No broker test");
+ } catch(ProgramInvocationException pie) {
+ if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
+ assertTrue(pie.getCause() instanceof JobExecutionException);
+
+ JobExecutionException jee = (JobExecutionException) pie.getCause();
+
+ assertTrue(jee.getCause() instanceof TimeoutException);
+
+ TimeoutException te = (TimeoutException) jee.getCause();
+
+ assertEquals("Timeout expired while fetching topic metadata", te.getMessage());
+ } else {
+ assertTrue(pie.getCause() instanceof JobExecutionException);
+
+ JobExecutionException jee = (JobExecutionException) pie.getCause();
+
+ assertTrue(jee.getCause() instanceof RuntimeException);
+
+ RuntimeException re = (RuntimeException) jee.getCause();
+
+ assertTrue(re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
+ }
+ }
+ }
+
+ /**
+ * Ensures that the committed offsets to Kafka are the offsets of "the next record to process"
+ */
+ public void runCommitOffsetsToKafka() throws Exception {
+ // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200);
+
+ DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps));
+ stream.addSink(new DiscardingSink<String>());
+
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final Thread runner = new Thread("runner") {
+ @Override
+ public void run() {
+ try {
+ env.execute();
+ }
+ catch (Throwable t) {
+ if (!(t.getCause() instanceof JobCancellationException)) {
+ errorRef.set(t);
+ }
+ }
+ }
+ };
+ runner.start();
+
+ final Long l50 = 50L; // the final committed offset in Kafka should be 50
+ final long deadline = 30000 + System.currentTimeMillis();
+
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+ do {
+ Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+ if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
+ break;
+ }
+
+ Thread.sleep(100);
+ }
+ while (System.currentTimeMillis() < deadline);
+
+ // cancel the job
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ final Throwable t = errorRef.get();
+ if (t != null) {
+ throw new RuntimeException("Job failed with an exception", t);
+ }
+
+ // final check to see if offsets are correctly in Kafka
+ Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+ Assert.assertEquals(Long.valueOf(50L), o1);
+ Assert.assertEquals(Long.valueOf(50L), o2);
+ Assert.assertEquals(Long.valueOf(50L), o3);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are
+ * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets.
+ * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up
+ * and starts at the correct position.
+ */
+ public void runStartFromKafkaCommitOffsets() throws Exception {
+ final int parallelism = 3;
+ final int recordsInEachPartition = 300;
+
+ final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);
+
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+ Long o1;
+ Long o2;
+ Long o3;
+ int attempt = 0;
+ // make sure that o1, o2, o3 are not all null before proceeding
+ do {
+ attempt++;
+ LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
+
+ env
+ .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
+ .map(new ThrottledMapper<String>(50))
+ .map(new MapFunction<String, Object>() {
+ int count = 0;
+ @Override
+ public Object map(String value) throws Exception {
+ count++;
+ if (count == 150) {
+ throw new SuccessException();
+ }
+ return null;
+ }
+ })
+ .addSink(new DiscardingSink<>());
+
+ tryExecute(env, "Read some records to commit offsets to Kafka");
+
+ o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+ } while (o1 == null && o2 == null && o3 == null && attempt < 3);
+
+ if (o1 == null && o2 == null && o3 == null) {
+ throw new RuntimeException("No offsets have been committed after 3 attempts");
+ }
+
+ LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
+
+ final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env2.getConfig().disableSysoutLogging();
+ env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env2.setParallelism(parallelism);
+
+ // whatever offsets were committed for each partition, the consumer should pick
+ // them up and start from the correct position so that the remaining records are all read
+ HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
+ partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
+ (o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition,
+ (o1 != null) ? o1.intValue() : 0
+ ));
+ partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
+ (o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition,
+ (o2 != null) ? o2.intValue() : 0
+ ));
+ partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
+ (o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition,
+ (o3 != null) ? o3.intValue() : 0
+ ));
+
+ readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
+ * is committed to Kafka, even if some partitions are not read.
+ *
+ * Test:
+ * - Create 3 partitions
+ * - write 50 messages into each.
+ * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka.
+ * - Check if the offsets in Kafka are set to 50 for the three partitions
+ *
+ * See FLINK-3440 as well
+ */
+ public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
+ final int parallelism = 3;
+ final int recordsInEachPartition = 50;
+
+ final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200);
+
+ Properties readProps = new Properties();
+ readProps.putAll(standardProps);
+ readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read
+
+ DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
+ stream.addSink(new DiscardingSink<String>());
+
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final Thread runner = new Thread("runner") {
+ @Override
+ public void run() {
+ try {
+ env.execute();
+ }
+ catch (Throwable t) {
+ if (!(t.getCause() instanceof JobCancellationException)) {
+ errorRef.set(t);
+ }
+ }
+ }
+ };
+ runner.start();
+
+ KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
+
+ final Long l50 = 50L; // the final committed offset in Kafka should be 50
+ final long deadline = 30000 + System.currentTimeMillis();
+ do {
+ Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+
+ if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
+ break;
+ }
+
+ Thread.sleep(100);
+ }
+ while (System.currentTimeMillis() < deadline);
+
+ // cancel the job
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ final Throwable t = errorRef.get();
+ if (t != null) {
+ throw new RuntimeException("Job failed with an exception", t);
+ }
+
+ // final check to see if offsets are correctly in Kafka
+ Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
+ Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
+ Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
+ Assert.assertEquals(Long.valueOf(50L), o1);
+ Assert.assertEquals(Long.valueOf(50L), o2);
+ Assert.assertEquals(Long.valueOf(50L), o3);
+
+ kafkaOffsetHandler.close();
+ deleteTestTopic(topicName);
+ }
+
+ /**
+ * Ensure Kafka is working on both producer and consumer side.
+ * This executes a job that contains two Flink pipelines.
+ *
+ * <pre>
+ * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
+ * </pre>
+ *
+ * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+ * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
+ * cause the test to fail.
+ *
+ * This test also ensures that FLINK-3156 doesn't happen again:
+ *
+ * The following situation caused a NPE in the FlinkKafkaConsumer
+ *
+ * topic-1 <-- elements are only produced into topic1.
+ * topic-2
+ *
+ * Therefore, this test is consuming as well from an empty topic.
+ *
+ */
+ @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
+ public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
+ final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
+ final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
+
+ final int parallelism = 3;
+ final int elementsPerPartition = 100;
+ final int totalElements = parallelism * elementsPerPartition;
+
+ createTestTopic(topic, parallelism, 2);
+ createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(500);
+ env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
+ env.getConfig().disableSysoutLogging();
+
+ TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+
+ // ----------- add producer dataflow ----------
+
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException {
+ int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+ int limit = cnt + elementsPerPartition;
+
+
+ while (running && cnt < limit) {
+ ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
+ cnt++;
+ // we delay data generation a bit so that we are sure that some checkpoints are
+ // triggered (for FLINK-3156)
+ Thread.sleep(50);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "3");
+ producerProperties.putAll(secureProps);
+ kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+
+ // ----------- add consumer dataflow ----------
+
+ List<String> topics = new ArrayList<>();
+ topics.add(topic);
+ topics.add(additionalEmptyTopic);
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props);
+
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+
+ private int elCnt = 0;
+ private BitSet validator = new BitSet(totalElements);
+
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ assertFalse("Received tuple twice", validator.get(v));
+ validator.set(v);
+ elCnt++;
+
+ if (elCnt == totalElements) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != totalElements) {
+ fail("The bitset was not set to 1 on all elements. Next clear:"
+ + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+ }).setParallelism(1);
+
+ try {
+ tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
+ }
+ catch (ProgramInvocationException | JobExecutionException e) {
+ // look for NotLeaderForPartitionException
+ Throwable cause = e.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof kafka.common.NotLeaderForPartitionException) {
+ throw (Exception) cause;
+ }
+ cause = cause.getCause();
+ }
+ throw e;
+ }
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
+ * Flink sources.
+ */
+ public void runOneToOneExactlyOnceTest() throws Exception {
+
+ final String topic = "oneToOneTopic";
+ final int parallelism = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ createTestTopic(topic, parallelism, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ kafkaServer,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-to-one exactly once test");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
+ * one Flink source will read multiple Kafka partitions.
+ */
+ public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
+ final String topic = "oneToManyTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ final int parallelism = 2;
+
+ createTestTopic(topic, numPartitions, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ kafkaServer,
+ topic, numPartitions, numElementsPerPartition, false);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 3))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-source-multi-partitions exactly once test");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
+ * that some Flink sources will read no partitions.
+ */
+ public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
+ final String topic = "manyToOneTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+ final int parallelism = 8;
+
+ createTestTopic(topic, numPartitions, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ kafkaServer,
+ topic, numPartitions, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ // set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions.
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setBufferTimeout(0);
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "multi-source-one-partitions exactly once test");
+
+
+ deleteTestTopic(topic);
+ }
+
+
+ /**
+ * Tests that the source can be properly canceled when reading full partitions.
+ */
+ public void runCancelingOnFullInputTest() throws Exception {
+ final String topic = "cancelingOnFullTopic";
+
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
+
+ // launch a producer thread
+ DataGenerators.InfiniteStringsGenerator generator =
+ new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
+ generator.start();
+
+ // launch a consumer asynchronously
+
+ final AtomicReference<Throwable> jobError = new AtomicReference<>();
+
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ env.execute("Runner for CancelingOnFullInputTest");
+ }
+ catch (Throwable t) {
+ jobError.set(t);
+ }
+ }
+ };
+
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
+
+ // wait a bit before canceling
+ Thread.sleep(2000);
+
+ Throwable failueCause = jobError.get();
+ if(failueCause != null) {
+ failueCause.printStackTrace();
+ Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
+ }
+
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
+
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
+
+ failueCause = jobError.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+ if (generator.isAlive()) {
+ generator.shutdown();
+ generator.join();
+ }
+ else {
+ Throwable t = generator.getError();
+ if (t != null) {
+ t.printStackTrace();
+ fail("Generator failed: " + t.getMessage());
+ } else {
+ fail("Generator failed with no exception");
+ }
+ }
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests that the source can be properly canceled when reading empty partitions.
+ */
+ public void runCancelingOnEmptyInputTest() throws Exception {
+ final String topic = "cancelingOnEmptyInputTopic";
+
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
+
+ env.addSource(source).addSink(new DiscardingSink<String>());
+
+ env.execute("CancelingOnEmptyInputTest");
+ }
+ catch (Throwable t) {
+ LOG.error("Job Runner failed with exception", t);
+ error.set(t);
+ }
+ }
+ };
+
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
+
+ // wait a bit before canceling
+ Thread.sleep(2000);
+
+ Throwable failueCause = error.get();
+ if (failueCause != null) {
+ failueCause.printStackTrace();
+ Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
+ }
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
+
+ failueCause = error.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Tests that the source can be properly canceled when reading full partitions.
+ */
+ public void runFailOnDeployTest() throws Exception {
+ final String topic = "failOnDeployTopic";
+
+ createTestTopic(topic, 2, 1);
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(12); // needs to be more that the mini cluster has slots
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
+
+ env
+ .addSource(kafkaSource)
+ .addSink(new DiscardingSink<Integer>());
+
+ try {
+ env.execute("test fail on deploy");
+ fail("this test should fail with an exception");
+ }
+ catch (ProgramInvocationException e) {
+
+ // validate that we failed due to a NoResourceAvailableException
+ Throwable cause = e.getCause();
+ int depth = 0;
+ boolean foundResourceException = false;
+
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof NoResourceAvailableException) {
+ foundResourceException = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ assertTrue("Wrong exception", foundResourceException);
+ }
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Test producing and consuming into multiple topics
+ * @throws java.lang.Exception
+ */
+ public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
+ final int NUM_TOPICS = 5;
+ final int NUM_ELEMENTS = 20;
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
+ // create topics with content
+ final List<String> topics = new ArrayList<>();
+ for (int i = 0; i < NUM_TOPICS; i++) {
+ final String topic = "topic-" + i;
+ topics.add(topic);
+ // create topic
+ createTestTopic(topic, i + 1 /*partitions*/, 1);
+ }
+ // run first job, producing into all topics
+ DataStream<Tuple3<Integer, Integer, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {
+
+ @Override
+ public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception {
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ for (int topicId = 0; topicId < NUM_TOPICS; topicId++) {
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId));
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+
+ Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null);
+
+ env.execute("Write to topics");
+
+ // run second job consuming from multiple topics
+ env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.getConfig().disableSysoutLogging();
+
+ stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
+
+ stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
+ Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
+ @Override
+ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
+ Integer count = countPerTopic.get(value.f2);
+ if (count == null) {
+ count = 1;
+ } else {
+ count++;
+ }
+ countPerTopic.put(value.f2, count);
+
+ // check map:
+ for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
+ if (el.getValue() < NUM_ELEMENTS) {
+ break; // not enough yet
+ }
+ if (el.getValue() > NUM_ELEMENTS) {
+ throw new RuntimeException("There is a failure in the test. I've read " +
+ el.getValue() + " from topic " + el.getKey());
+ }
+ }
+ // we've seen messages from all topics
+ throw new SuccessException();
+ }
+ }).setParallelism(1);
+
+ tryExecute(env, "Count elements from the topics");
+
+
+ // delete all topics again
+ for (int i = 0; i < NUM_TOPICS; i++) {
+ final String topic = "topic-" + i;
+ deleteTestTopic(topic);
+ }
+ }
+
+ /**
+ * Serialization scheme forwarding byte[] records.
+ */
+ private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
+
+ @Override
+ public byte[] serializeKey(byte[] element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(byte[] element) {
+ return element;
+ }
+
+ @Override
+ public String getTargetTopic(byte[] element) {
+ return null;
+ }
+ }
+
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
+
+ /**
+ * Test Flink's Kafka integration also with very big records (30MB)
+ * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ *
+ */
+ public void runBigRecordTestTopology() throws Exception {
+
+ final String topic = "bigRecordTestTopic";
+ final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
+
+ createTestTopic(topic, parallelism, 1);
+
+ final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+
+ final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+ new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(100);
+ env.setParallelism(parallelism);
+
+ // add consuming topology:
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
+ consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
+ consumerProps.setProperty("queued.max.message.chunks", "1");
+ consumerProps.putAll(secureProps);
+
+ FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+
+ private int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ elCnt++;
+ if (value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if (elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
+ }
+ }
+ if (elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ });
+
+ // add producing topology
+ Properties producerProps = new Properties();
+ producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
+ producerProps.setProperty("retries", "3");
+ producerProps.putAll(secureProps);
+ producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
+
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+
+ private boolean running;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+ Random rnd = new Random();
+ long cnt = 0;
+ int sevenMb = 1024 * 1024 * 7;
+
+ while (running) {
+ byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)];
+ ctx.collect(new Tuple2<>(cnt++, wl));
+
+ Thread.sleep(100);
+
+ if (cnt == 10) {
+ // signal end
+ ctx.collect(new Tuple2<>(-1L, new byte[]{1}));
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
+
+ tryExecute(env, "big topology test");
+ deleteTestTopic(topic);
+ }
+
+
+ public void runBrokerFailureTest() throws Exception {
+ final String topic = "brokerFailureTestTopic";
+
+ final int parallelism = 2;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+
+ createTestTopic(topic, parallelism, 2);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ kafkaServer,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // find leader to shut down
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+
+ LOG.info("Leader to shutdown {}", leaderId);
+
+
+ // run the topology (the consumers must handle the failures)
+
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(500);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ BrokerKillingMapper.killedLeaderBefore = false;
+ tryExecute(env, "Broker failure once test");
+
+ // start a new broker:
+ kafkaServer.restartBroker(leaderId);
+ }
+
+ public void runKeyValueTest() throws Exception {
+ final String topic = "keyvaluetest";
+ createTestTopic(topic, 1, 1);
+ final int ELEMENT_COUNT = 5000;
+
+ // ----------- Write some data into Kafka -------------------
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
+ @Override
+ public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
+ Random rnd = new Random(1337);
+ for (long i = 0; i < ELEMENT_COUNT; i++) {
+ PojoValue pojo = new PojoValue();
+ pojo.when = new Date(rnd.nextLong());
+ pojo.lon = rnd.nextLong();
+ pojo.lat = i;
+ // make every second key null to ensure proper "null" serialization
+ Long key = (i % 2 == 0) ? null : i;
+ ctx.collect(new Tuple2<>(key, pojo));
+ }
+ }
+ @Override
+ public void cancel() {
+ }
+ });
+
+ KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "3");
+ kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
+ env.execute("Write KV to Kafka");
+
+ // ----------- Read the data again -------------------
+
+ env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+
+ KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
+ fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+ long counter = 0;
+ @Override
+ public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
+ // the elements should be in order.
+ Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+ if (value.f1.lat % 2 == 0) {
+ assertNull("key was not null", value.f0);
+ } else {
+ Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
+ }
+ counter++;
+ if (counter == ELEMENT_COUNT) {
+ // we got the right number of elements
+ throw new SuccessException();
+ }
+ }
+ });
+
+ tryExecute(env, "Read KV from Kafka");
+
+ deleteTestTopic(topic);
+ }
+
+ public static class PojoValue {
+ public Date when;
+ public long lon;
+ public long lat;
+ public PojoValue() {}
+ }
+
+
+ /**
+ * Test delete behavior and metrics for producer
+ * @throws Exception
+ */
+ public void runAllDeletesTest() throws Exception {
+ final String topic = "alldeletestest";
+ createTestTopic(topic, 1, 1);
+ final int ELEMENT_COUNT = 300;
+
+ // ----------- Write some data into Kafka -------------------
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() {
+ @Override
+ public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
+ Random rnd = new Random(1337);
+ for (long i = 0; i < ELEMENT_COUNT; i++) {
+ final byte[] key = new byte[200];
+ rnd.nextBytes(key);
+ ctx.collect(new Tuple2<>(key, (PojoValue) null));
+ }
+ }
+ @Override
+ public void cancel() {
+ }
+ });
+
+ TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig());
+
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "3");
+ producerProperties.putAll(secureProps);
+ kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
+
+ env.execute("Write deletes to Kafka");
+
+ // ----------- Read the data again -------------------
+
+ env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+ DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props));
+
+ fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
+ long counter = 0;
+ @Override
+ public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception {
+ // ensure that deleted messages are passed as nulls
+ assertNull(value.f1);
+ counter++;
+ if (counter == ELEMENT_COUNT) {
+ // we got the right number of elements
+ throw new SuccessException();
+ }
+ }
+ });
+
+ tryExecute(env, "Read deletes from Kafka");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated.
+ *
+ * @throws Exception
+ */
+ public void runEndOfStreamTest() throws Exception {
+
+ final int ELEMENT_COUNT = 300;
+ final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
+
+ // read using custom schema
+ final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.setParallelism(1);
+ env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env1.getConfig().disableSysoutLogging();
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
+ fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
+ // noop ;)
+ }
+ });
+
+ JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka");
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Test metrics reporting for consumer
+ *
+ * @throws Exception
+ */
+ public void runMetricsTest() throws Throwable {
+
+ // create a stream with 5 topics
+ final String topic = "metricsStream";
+ createTestTopic(topic, 5, 1);
+
+ final Tuple1<Throwable> error = new Tuple1<>(null);
+ Runnable job = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // start job writing & reading data.
+ final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.setParallelism(1);
+ env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env1.getConfig().disableSysoutLogging();
+ env1.disableOperatorChaining(); // let the source read everything into the network buffers
+
+ Properties props = new Properties();
+ props.putAll(standardProps);
+ props.putAll(secureProps);
+
+ TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+ DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
+ fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
+ }
+ });
+
+ DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+ boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int i = 0;
+ while (running) {
+ ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+
+ env1.execute("Metrics test job");
+ } catch(Throwable t) {
+ LOG.warn("Got exception during execution", t);
+ if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job
+ error.f0 = t;
+ }
+ }
+ }
+ };
+ Thread jobThread = new Thread(job);
+ jobThread.start();
+
+ try {
+ // connect to JMX
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ // wait until we've found all 5 offset metrics
+ Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
+ while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working
+ if (error.f0 != null) {
+ // fail test early
+ throw error.f0;
+ }
+ offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(5, offsetMetrics.size());
+ // we can't rely on the consumer to have touched all the partitions already
+ // that's why we'll wait until all five partitions have a positive offset.
+ // The test will fail if we never meet the condition
+ while (true) {
+ int numPosOffsets = 0;
+ // check that offsets are correctly reported
+ for (ObjectName object : offsetMetrics) {
+ Object offset = mBeanServer.getAttribute(object, "Value");
+ if((long) offset >= 0) {
+ numPosOffsets++;
+ }
+ }
+ if (numPosOffsets == 5) {
+ break;
+ }
+ // wait for the consumer to consume on all partitions
+ Thread.sleep(50);
+ }
+
+ // check if producer metrics are also available.
+ Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
+ Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
+
+
+ LOG.info("Found all JMX metrics. Cancelling job.");
+ } finally {
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ }
+
+ while (jobThread.isAlive()) {
+ Thread.sleep(50);
+ }
+ if (error.f0 != null) {
+ throw error.f0;
+ }
+
+ deleteTestTopic(topic);
+ }
+
+
+ public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
+
+ final int finalCount;
+ int count = 0;
+
+ TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+ TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
+
+ public FixedNumberDeserializationSchema(int finalCount) {
+ this.finalCount = finalCount;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ return ser.deserialize(in);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) {
+ return ++count >= finalCount;
+ }
+
+ @Override
+ public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
+ return ti;
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Reading writing test data sets
+ // ------------------------------------------------------------------------
+
+ /**
+ * Runs a job using the provided environment to read a sequence of records from a single Kafka topic.
+ * The method allows to individually specify the expected starting offset and total read value count of each partition.
+ * The job will be considered successful only if all partition read results match the start offset and value count criteria.
+ */
+ protected void readSequence(StreamExecutionEnvironment env, Properties cc,
+ final String topicName,
+ final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
+ final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size();
+
+ int finalCountTmp = 0;
+ for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) {
+ finalCountTmp += valuesCountAndStartOffset.getValue().f0;
+ }
+ final int finalCount = finalCountTmp;
+
+ final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+ final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
+ new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
+
+ // create the consumer
+ cc.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
+
+ DataStream<Tuple2<Integer, Integer>> source = env
+ .addSource(consumer).setParallelism(sourceParallelism)
+ .map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
+
+ // verify data
+ source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+
+ private HashMap<Integer, BitSet> partitionsToValueCheck;
+ private int count = 0;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ partitionsToValueCheck = new HashMap<>();
+ for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) {
+ partitionsToValueCheck.put(partition, new BitSet());
+ }
+ }
+
+ @Override
+ public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+ int partition = value.f0;
+ int val = value.f1;
+
+ BitSet bitSet = partitionsToValueCheck.get(partition);
+ if (bitSet == null) {
+ throw new RuntimeException("Got a record from an unknown partition");
+ } else {
+ bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1);
+ }
+
+ count++;
+
+ LOG.info("Received message {}, total {} messages", value, count);
+
+ // verify if we've seen everything
+ if (count == finalCount) {
+ for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) {
+ BitSet check = partitionsToValueCheck.getValue();
+ int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0;
+
+ if (check.cardinality() != expectedValueCount) {
+ throw new RuntimeException("Expected cardinality to be " + expectedValueCount +
+ ", but was " + check.cardinality());
+ } else if (check.nextClearBit(0) != expectedValueCount) {
+ throw new RuntimeException("Expected next clear bit to be " + expectedValueCount +
+ ", but was " + check.cardinality());
+ }
+ }
+
+ // test has passed
+ throw new SuccessException();
+ }
+ }
+
+ }).setParallelism(1);
+
+ tryExecute(env, "Read data from Kafka");
+
+ LOG.info("Successfully read sequence for verification");
+ }
+
+ /**
+ * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to
+ * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
+ */
+ protected void readSequence(StreamExecutionEnvironment env, Properties cc,
+ final int sourceParallelism,
+ final String topicName,
+ final int valuesCount, final int startFrom) throws Exception {
+ HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
+ for (int i = 0; i < sourceParallelism; i++) {
+ partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
+ }
+ readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset);
+ }
+
+ protected String writeSequence(
+ String baseTopicName,
+ final int numElements,
+ final int parallelism,
+ final int replicationFactor) throws Exception
+ {
+ LOG.info("\n===================================\n" +
+ "== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" +
+ "===================================");
+
+ final TypeInformation<Tuple2<Integer, Integer>> resultType =
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
+
+ final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
+ new KeyedSerializationSchemaWrapper<>(
+ new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
+ new KeyedDeserializationSchemaWrapper<>(
+ new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+ final int maxNumAttempts = 10;
+
+ for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
+
+ final String topicName = baseTopicName + '-' + attempt;
+
+ LOG.info("Writing attempt #1");
+
+ // -------- Write the Sequence --------
+
+ createTestTopic(topicName, parallelism, replicationFactor);
+
+ StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ writeEnv.getConfig().disableSysoutLogging();
+
+ DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int cnt = 0;
+ int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+ while (running && cnt < numElements) {
+ ctx.collect(new Tuple2<>(partition, cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }).setParallelism(parallelism);
+
+ // the producer must not produce duplicates
+ Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+ producerProperties.setProperty("retries", "0");
+ producerProperties.putAll(secureProps);
+
+ kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+ .setParallelism(parallelism);
+
+ try {
+ writeEnv.execute("Write sequence");
+ }
+ catch (Exception e) {
+ LOG.error("Write attempt failed, trying again", e);
+ deleteTestTopic(topicName);
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+ continue;
+ }
+
+ LOG.info("Finished writing sequence");
+
+ // -------- Validate the Sequence --------
+
+ // we need to validate the sequence, because kafka's producers are not exactly once
+ LOG.info("Validating sequence");
+
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+
+ final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ readEnv.getConfig().disableSysoutLogging();
+ readEnv.setParallelism(parallelism);
+
+ Properties readProps = (Properties) standardProps.clone();
+ readProps.setProperty("group.id", "flink-tests-validator");
+ readProps.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
+
+ readEnv
+ .addSource(consumer)
+ .map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+
+ private final int totalCount = parallelism * numElements;
+ private int count = 0;
+
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ if (++count == totalCount) {
+ throw new SuccessException();
+ } else {
+ return value;
+ }
+ }
+ }).setParallelism(1)
+ .addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
+
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ Thread runner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ tryExecute(readEnv, "sequence validation");
+ } catch (Throwable t) {
+ errorRef.set(t);
+ }
+ }
+ };
+ runner.start();
+
+ final long deadline = System.currentTimeMillis() + 10000;
+ long delay;
+ while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) {
+ runner.join(delay);
+ }
+
+ boolean success;
+
+ if (runner.isAlive()) {
+ // did not finish in time, maybe the producer dropped one or more records and
+ // the validation did not reach the exit point
+ success = false;
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ }
+ else {
+ Throwable error = errorRef.get();
+ if (error != null) {
+ success = false;
+ LOG.info("Attempt " + attempt + " failed with exception", error);
+ }
+ else {
+ success = true;
+ }
+ }
+
+ JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+
+ if (success) {
+ // everything is good!
+ return topicName;
+ }
+ else {
+ deleteTestTopic(topicName);
+ // fall through the loop
+ }
+ }
+
+ throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
+ }
+
+ // ------------------------------------------------------------------------
+ // Debugging utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Read topic to list, only using Kafka code.
+ */
+ private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+ ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+ if (streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if (kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if (kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+ ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+ List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
+ int read = 0;
+ while(iteratorToRead.hasNext()) {
+ read++;
+ result.add(iteratorToRead.next());
+ if (read == stopAfter) {
+ LOG.info("Read "+read+" elements");
+ return result;
+ }
+ }
+ return result;
+ }
+
+ private static void printTopic(String topicName, ConsumerConfig config,
+ DeserializationSchema<?> deserializationSchema,
+ int stopAfter) throws IOException {
+
+ List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+ LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+
+ for (MessageAndMetadata<byte[], byte[]> message: contents) {
+ Object out = deserializationSchema.deserialize(message.message());
+ LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+ }
+ }
+
+ private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer)
+ throws IOException
+ {
+ // write the sequence to log for debugging purposes
+ Properties newProps = new Properties(standardProps);
+ newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+ newProps.setProperty("auto.offset.reset", "smallest");
+ newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
+ newProps.putAll(secureProps);
+
+ ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+ printTopic(topicName, printerConfig, deserializer, elements);
+ }
+
+
+ public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+ implements Checkpointed<Integer>, CheckpointListener {
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean killedLeaderBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+ private final int shutdownBrokerId;
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+
+ public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
+ this.shutdownBrokerId = shutdownBrokerId;
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!killedLeaderBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ // shut down a Kafka broker
+ KafkaServer toShutDown = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
+ if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+ toShutDown = server;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ }
+ else {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ killedLeaderBefore = true;
+ toShutDown.shutdown();
+ }
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return numElementsTotal;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ this.numElementsTotal = state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
new file mode 100644
index 0000000..c925c8f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.test.util.SuccessException;
+
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public abstract class KafkaProducerTestBase extends KafkaTestBase {
+
+
+ /**
+ *
+ * <pre>
+ * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
+ * / | \
+ * / | \
+ * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
+ * \ | /
+ * \ | /
+ * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+ * </pre>
+ *
+ * The mapper validates that the values come consistently from the correct Kafka partition.
+ *
+ * The final sink validates that there are no duplicates and that all partitions are present.
+ */
+ public void runCustomPartitioningTest() {
+ try {
+ LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
+
+ final String topic = "customPartitioningTestTopic";
+ final int parallelism = 3;
+
+ createTestTopic(topic, parallelism, 1);
+
+ TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+ TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+
+ // ------ producing topology ---------
+
+ // source has DOP 1 to make sure it generates no duplicates
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+
+ private boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
+ long cnt = 0;
+ while (running) {
+ ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
+ cnt++;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ })
+ .setParallelism(1);
+
+ Properties props = new Properties();
+ props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+ props.putAll(secureProps);
+
+ // sink partitions into
+ kafkaServer.produceIntoKafka(stream, topic,
+ new KeyedSerializationSchemaWrapper<>(serSchema),
+ props,
+ new CustomPartitioner(parallelism)).setParallelism(parallelism);
+
+ // ------ consuming topology ---------
+
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.putAll(secureProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
+
+ env.addSource(source).setParallelism(parallelism)
+
+ // mapper that validates partitioning and maps to partition
+ .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
+
+ private int ourPartition = -1;
+ @Override
+ public Integer map(Tuple2<Long, String> value) {
+ int partition = value.f0.intValue() % parallelism;
+ if (ourPartition != -1) {
+ assertEquals("inconsistent partitioning", ourPartition, partition);
+ } else {
+ ourPartition = partition;
+ }
+ return partition;
+ }
+ }).setParallelism(parallelism)
+
+ .addSink(new SinkFunction<Integer>() {
+
+ private int[] valuesPerPartition = new int[parallelism];
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ valuesPerPartition[value]++;
+
+ boolean missing = false;
+ for (int i : valuesPerPartition) {
+ if (i < 100) {
+ missing = true;
+ break;
+ }
+ }
+ if (!missing) {
+ throw new SuccessException();
+ }
+ }
+ }).setParallelism(1);
+
+ tryExecute(env, "custom partitioning test");
+
+ deleteTestTopic(topic);
+
+ LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+
+ private final int expectedPartitions;
+
+ public CustomPartitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+
+ @Override
+ public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ assertEquals(expectedPartitions, numPartitions);
+
+ return (int) (next.f0 % numPartitions);
+ }
+ }
+}