You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:24 UTC
[34/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ *
+ * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ *
+ * Because of that, we need to two versions whose compiled code goes against different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+ public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
+ consumer.assign(topicPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
+ * deserialize and emit the records.
+ *
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and to even
+ * deadlock in certain situations.
+ *
+ * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+ /** Logger for this consumer */
+ private final Logger log;
+
+ /** The handover of data and exceptions between the consumer thread and the task thread */
+ private final Handover handover;
+
+ /** The next offsets that the main thread should commit */
+ private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
+
+ /** The configuration for the Kafka consumer */
+ private final Properties kafkaProperties;
+
+ /** The partitions that this consumer reads from */
+ private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
+
+ /** We get this from the outside to publish metrics. **/
+ private final MetricGroup kafkaMetricGroup;
+
+ /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
+ private final KafkaConsumerCallBridge consumerCallBridge;
+
+ /** The maximum number of milliseconds to wait for a fetch batch */
+ private final long pollTimeout;
+
+ /** Flag whether to add Kafka's metrics to the Flink metrics */
+ private final boolean useMetrics;
+
+ /** Reference to the Kafka consumer, once it is created */
+ private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+ /** Flag to mark the main work loop as alive */
+ private volatile boolean running;
+
+ /** Flag tracking whether the latest commit request has completed */
+ private volatile boolean commitInProgress;
+
+
+ public KafkaConsumerThread(
+ Logger log,
+ Handover handover,
+ Properties kafkaProperties,
+ KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
+ MetricGroup kafkaMetricGroup,
+ KafkaConsumerCallBridge consumerCallBridge,
+ String threadName,
+ long pollTimeout,
+ boolean useMetrics) {
+
+ super(threadName);
+ setDaemon(true);
+
+ this.log = checkNotNull(log);
+ this.handover = checkNotNull(handover);
+ this.kafkaProperties = checkNotNull(kafkaProperties);
+ this.subscribedPartitions = checkNotNull(subscribedPartitions);
+ this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+ this.consumerCallBridge = checkNotNull(consumerCallBridge);
+ this.pollTimeout = pollTimeout;
+ this.useMetrics = useMetrics;
+
+ this.nextOffsetsToCommit = new AtomicReference<>();
+ this.running = true;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void run() {
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // this is the means to talk to FlinkKafkaConsumer's main thread
+ final Handover handover = this.handover;
+
+ // This method initializes the KafkaConsumer and guarantees it is torn down properly.
+ // This is important, because the consumer has multi-threading issues,
+ // including concurrent 'close()' calls.
+ final KafkaConsumer<byte[], byte[]> consumer;
+ try {
+ consumer = new KafkaConsumer<>(kafkaProperties);
+ }
+ catch (Throwable t) {
+ handover.reportError(t);
+ return;
+ }
+
+ // from here on, the consumer is guaranteed to be closed properly
+ try {
+ // The callback invoked by Kafka once an offset commit is complete
+ final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
+
+ // tell the consumer which partitions to work with
+ consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
+
+ // register Kafka's very own metrics in Flink's metric reporters
+ if (useMetrics) {
+ // register Kafka metrics to Flink
+ Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka implementation returns null here.
+ log.info("Consumer implementation does not support metrics");
+ } else {
+ // we have Kafka metrics, register them
+ for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+ kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
+ }
+ }
+ }
+
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // seek the consumer to the initial offsets
+ for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
+ if (partition.isOffsetDefined()) {
+ log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
+ "seeking the consumer to position {}",
+ partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
+
+ consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+ }
+ else {
+ // for partitions that do not have offsets restored from a checkpoint/savepoint,
+ // we need to define our internal offset state for them using the initial offsets retrieved from Kafka
+ // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
+
+ long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
+
+ log.info("Partition {} has no initial offset; the consumer has position {}, " +
+ "so the initial offset will be set to {}",
+ partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+ // the fetched offset represents the next record to process, so we need to subtract it by 1
+ partition.setOffset(fetchedOffset - 1);
+ }
+ }
+
+ // from now on, external operations may call the consumer
+ this.consumer = consumer;
+
+ // the latest bulk of records. may carry across the loop if the thread is woken up
+ // from blocking on the handover
+ ConsumerRecords<byte[], byte[]> records = null;
+
+ // main fetch loop
+ while (running) {
+
+ // check if there is something to commit
+ if (!commitInProgress) {
+ // get and reset the work-to-be committed, so we don't repeatedly commit the same
+ final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+ if (toCommit != null) {
+ log.debug("Sending async offset commit request to Kafka broker");
+
+ // also record that a commit is already in progress
+ // the order here matters! first set the flag, then send the commit command.
+ commitInProgress = true;
+ consumer.commitAsync(toCommit, offsetCommitCallback);
+ }
+ }
+
+ // get the next batch of records, unless we did not manage to hand the old batch over
+ if (records == null) {
+ try {
+ records = consumer.poll(pollTimeout);
+ }
+ catch (WakeupException we) {
+ continue;
+ }
+ }
+
+ try {
+ handover.produce(records);
+ records = null;
+ }
+ catch (Handover.WakeupException e) {
+ // fall through the loop
+ }
+ }
+ // end main fetch loop
+ }
+ catch (Throwable t) {
+ // let the main thread know and exit
+ // it may be that this exception comes because the main thread closed the handover, in
+ // which case the below reporting is irrelevant, but does not hurt either
+ handover.reportError(t);
+ }
+ finally {
+ // make sure the handover is closed if it is not already closed or has an error
+ handover.close();
+
+ // make sure the KafkaConsumer is closed
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ log.warn("Error while closing Kafka consumer", t);
+ }
+ }
+ }
+
+ /**
+ * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
+ */
+ public void shutdown() {
+ running = false;
+
+ // We cannot call close() on the KafkaConsumer, because it will actually throw
+ // an exception if a concurrent call is in progress
+
+ // this wakes up the consumer if it is blocked handing over records
+ handover.wakeupProducer();
+
+ // this wakes up the consumer if it is blocked in a kafka poll
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ }
+
+ /**
+ * Tells this thread to commit a set of offsets. This method does not block, the committing
+ * operation will happen asynchronously.
+ *
+ * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+ * the frequency with which this method is called, then some commits may be skipped due to being
+ * superseded by newer ones.
+ *
+ * @param offsetsToCommit The offsets to commit
+ */
+ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+ // record the work to be committed by the main consumer thread and make sure the consumer notices that
+ if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+ log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
+ "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
+ "This does not compromise Flink's checkpoint integrity.");
+ }
+
+ // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
+ handover.wakeupProducer();
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+ ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
+ for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+ result.add(p.getKafkaPartitionHandle());
+ }
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class CommitCallback implements OffsetCommitCallback {
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
+ commitInProgress = false;
+
+ if (ex != null) {
+ log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..7a82365
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka09FetcherTest {
+
+ @Test
+ public void testCommitDoesNotBlock() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+ final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // to synchronize when the consumer is in its blocking method
+ final OneShotLatch sync = new OneShotLatch();
+
+ // ----- the mock consumer with blocking poll calls ----
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ sync.trigger();
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the fetcher has reached the method of interest
+ sync.await();
+
+ // ----- trigger the offset commit -----
+
+ final AtomicReference<Throwable> commitError = new AtomicReference<>();
+ final Thread committer = new Thread("committer runner") {
+ @Override
+ public void run() {
+ try {
+ fetcher.commitInternalOffsetsToKafka(testCommitData);
+ } catch (Throwable t) {
+ commitError.set(t);
+ }
+ }
+ };
+ committer.start();
+
+ // ----- ensure that the committer finishes in time -----
+ committer.join(30000);
+ assertFalse("The committer did not finish in time", committer.isAlive());
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable fetcherError = error.get();
+ if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", fetcherError);
+ }
+ final Throwable committerError = commitError.get();
+ if (committerError != null) {
+ throw new Exception("Exception in the committer", committerError);
+ }
+ }
+
+ @Test
+ public void ensureOffsetsGetCommitted() throws Exception {
+
+ // test data
+ final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+ final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+ final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+ testCommitData1.put(testPartition1, 11L);
+ testCommitData1.put(testPartition2, 18L);
+
+ final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+ testCommitData2.put(testPartition1, 19L);
+ testCommitData2.put(testPartition2, 28L);
+
+ final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+ // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+ final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+ KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+ blockerLatch.await();
+ return ConsumerRecords.empty();
+ }
+ });
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ blockerLatch.trigger();
+ return null;
+ }
+ }).when(mockConsumer).wakeup();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ Map<TopicPartition, OffsetAndMetadata> offsets =
+ (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+ OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+ commitStore.add(offsets);
+ callback.onComplete(offsets, null);
+
+ return null;
+ }
+ }).when(mockConsumer).commitAsync(
+ Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+ // make sure the fetcher creates the mock consumer
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext = mock(SourceContext.class);
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // ----- trigger the first offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(12L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(17L, entry.getValue().offset());
+ }
+ }
+
+ // ----- trigger the second offset commit -----
+
+ fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+ for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+ TopicPartition partition = entry.getKey();
+ if (partition.topic().equals("test")) {
+ assertEquals(42, partition.partition());
+ assertEquals(20L, entry.getValue().offset());
+ }
+ else if (partition.topic().equals("another")) {
+ assertEquals(99, partition.partition());
+ assertEquals(27L, entry.getValue().offset());
+ }
+ }
+
+ // ----- test done, wait till the fetcher is done for a clean shutdown -----
+ fetcher.cancel();
+ fetcherRunner.join();
+
+ // check that there were no errors in the fetcher
+ final Throwable caughtError = error.get();
+ if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", caughtError);
+ }
+ }
+
+ @Test
+ public void testCancellationWhenEmitBlocks() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+ return consumerRecords;
+ }
+ });
+
+ whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
+ List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the thread started to emit records to the source context
+ sourceContext.waitTillHasBlocker();
+
+ // now we try to cancel the fetcher, including the interruption usually done on the task thread
+ // once it has finished, there must be no more thread blocked on the source context
+ fetcher.cancel();
+ fetcherRunner.interrupt();
+ fetcherRunner.join();
+
+ assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ private static final class BlockingSourceContext<T> implements SourceContext<T> {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final OneShotLatch inBlocking = new OneShotLatch();
+
+ @Override
+ public void collect(T element) {
+ block();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ block();
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ block();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+
+ @Override
+ public void close() {}
+
+ public void waitTillHasBlocker() throws InterruptedException {
+ inBlocking.await();
+ }
+
+ public boolean isStillBlocking() {
+ return lock.isLocked();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
+ private void block() {
+ lock.lock();
+ try {
+ inBlocking.trigger();
+
+ // put this thread to sleep indefinitely
+ final Object o = new Object();
+ while (true) {
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit cleanly, simply reset the interruption flag
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
new file mode 100644
index 0000000..d18e2a9
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+public class Kafka09ITCase extends KafkaConsumerTestBase {
+
+ // ------------------------------------------------------------------------
+ // Suite of Tests
+ // ------------------------------------------------------------------------
+
+ @Test(timeout = 60000)
+ public void testFailOnNoBroker() throws Exception {
+ runFailOnNoBrokerTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testConcurrentProducerConsumerTopology() throws Exception {
+ runSimpleConcurrentProducerConsumerTopology();
+ }
+
+
+ @Test(timeout = 60000)
+ public void testKeyValueSupport() throws Exception {
+ runKeyValueTest();
+ }
+
+ // --- canceling / failures ---
+
+ @Test(timeout = 60000)
+ public void testCancelingEmptyTopic() throws Exception {
+ runCancelingOnEmptyInputTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testCancelingFullTopic() throws Exception {
+ runCancelingOnFullInputTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testFailOnDeploy() throws Exception {
+ runFailOnDeployTest();
+ }
+
+
+ // --- source to partition mappings and exactly once ---
+
+ @Test(timeout = 60000)
+ public void testOneToOneSources() throws Exception {
+ runOneToOneExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testOneSourceMultiplePartitions() throws Exception {
+ runOneSourceMultiplePartitionsExactlyOnceTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSourcesOnePartition() throws Exception {
+ runMultipleSourcesOnePartitionExactlyOnceTest();
+ }
+
+ // --- broker failure ---
+
+ @Test(timeout = 60000)
+ public void testBrokerFailure() throws Exception {
+ runBrokerFailureTest();
+ }
+
+ // --- special executions ---
+
+ @Test(timeout = 60000)
+ public void testBigRecordJob() throws Exception {
+ runBigRecordTestTopology();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleTopics() throws Exception {
+ runProduceConsumeMultipleTopics();
+ }
+
+ @Test(timeout = 60000)
+ public void testAllDeletes() throws Exception {
+ runAllDeletesTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testEndOfStream() throws Exception {
+ runEndOfStreamTest();
+ }
+
+ @Test(timeout = 60000)
+ public void testMetrics() throws Throwable {
+ runMetricsTest();
+ }
+
+ // --- offset committing ---
+
+ @Test(timeout = 60000)
+ public void testCommitOffsetsToKafka() throws Exception {
+ runCommitOffsetsToKafka();
+ }
+
+ @Test(timeout = 60000)
+ public void testStartFromKafkaCommitOffsets() throws Exception {
+ runStartFromKafkaCommitOffsets();
+ }
+
+ @Test(timeout = 60000)
+ public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ runAutoOffsetRetrievalAndCommitToKafka();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..45f70ac
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+ return new Kafka09JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected SerializationSchema<Row> getSerializationSchema() {
+ return new JsonRowSerializationSchema(FIELD_NAMES);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+ @Override
+ protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+ return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+ return (Class) JsonRowDeserializationSchema.class;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer09.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
new file mode 100644
index 0000000..ae4f5b2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka09ProducerITCase extends KafkaProducerTestBase {
+
+ @Test
+ public void testCustomPartitioning() {
+ runCustomPartitioningTest();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+ @BeforeClass
+ public static void prepare() throws IOException, ClassNotFoundException {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting Kafka09SecuredRunITCase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ SecureTestEnvironment.prepare(tempFolder);
+ SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+ startClusters(true);
+ }
+
+ @AfterClass
+ public static void shutDownServices() {
+ shutdownClusters();
+ SecureTestEnvironment.cleanup();
+ }
+
+
+ //timeout interval is large since in Travis, ZK connection timeout occurs frequently
+ //The timeout for the test case is 2 times timeout of ZK connection
+ @Test(timeout = 600000)
+ public void testMultipleTopics() throws Exception {
+ runProduceConsumeMultipleTopics();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..18b2aec
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPropagateExceptions() {
+ try {
+ // mock kafka producer
+ KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+
+ // partition setup
+ when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+ // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
+ Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+ // failure when trying to send an element
+ when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+ .thenAnswer(new Answer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+ Callback callback = (Callback) invocation.getArguments()[1];
+ callback.onCompletion(null, new Exception("Test error"));
+ return null;
+ }
+ });
+
+ // make sure the FlinkKafkaProducer instantiates our mock producer
+ whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+
+ // (1) producer that propagates errors
+
+ FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
+
+ testHarness.open();
+
+ try {
+ testHarness.processElement(new StreamRecord<>("value"));
+ testHarness.processElement(new StreamRecord<>("value"));
+ fail("This should fail with an exception");
+ }
+ catch (Exception e) {
+ assertNotNull(e.getCause());
+ assertNotNull(e.getCause().getMessage());
+ assertTrue(e.getCause().getMessage().contains("Test error"));
+ }
+
+ // (2) producer that only logs errors
+
+ FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ producerLogging.setLogFailuresOnly(true);
+
+ testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("value"));
+ testHarness.processElement(new StreamRecord<>("value"));
+
+ testHarness.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..1802e0c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.api.PartitionMetadata;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.9
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+ private File tmpZkDir;
+ private File tmpKafkaParent;
+ private List<File> tmpKafkaDirs;
+ private List<KafkaServer> brokers;
+ private TestingServer zookeeper;
+ private String zookeeperConnectionString;
+ private String brokerConnectionString = "";
+ private Properties standardProps;
+ private Properties additionalServerProperties;
+ private boolean secureMode = false;
+ // 6 seconds is default. Seems to be too small for travis. 30 seconds
+ private String zkTimeout = "30000";
+
+ public String getBrokerConnectionString() {
+ return brokerConnectionString;
+ }
+
+ @Override
+ public Properties getStandardProperties() {
+ return standardProps;
+ }
+
+ @Override
+ public String getVersion() {
+ return "0.9";
+ }
+
+ @Override
+ public List<KafkaServer> getBrokers() {
+ return brokers;
+ }
+
+ @Override
+ public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+ return new FlinkKafkaConsumer09<>(topics, readSchema, props);
+ }
+
+ @Override
+ public <T> StreamSink<T> getProducerSink(
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props,
+ KafkaPartitioner<T> partitioner) {
+ FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+ prod.setFlushOnCheckpoint(true);
+ return new StreamSink<>(prod);
+ }
+
+ @Override
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+ prod.setFlushOnCheckpoint(true);
+ return stream.addSink(prod);
+ }
+
+ @Override
+ public KafkaOffsetHandler createOffsetHandler(Properties props) {
+ return new KafkaOffsetHandlerImpl(props);
+ }
+
+ @Override
+ public void restartBroker(int leaderId) throws Exception {
+ brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+ }
+
+ @Override
+ public int getLeaderToShutDown(String topic) throws Exception {
+ ZkUtils zkUtils = getZkUtils();
+ try {
+ PartitionMetadata firstPart = null;
+ do {
+ if (firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
+ }
+
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ }
+ while (firstPart.errorCode() != 0);
+
+ return firstPart.leader().get().id();
+ } finally {
+ zkUtils.close();
+ }
+ }
+
+ @Override
+ public int getBrokerId(KafkaServer server) {
+ return server.config().brokerId();
+ }
+
+ @Override
+ public boolean isSecureRunSupported() {
+ return true;
+ }
+
+ @Override
+ public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
+
+ //increase the timeout since in Travis ZK connection takes long time for secure connection.
+ if(secureMode) {
+ //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+ numKafkaServers = 1;
+ zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
+ }
+
+ this.additionalServerProperties = additionalServerProperties;
+ this.secureMode = secureMode;
+ File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+ tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+ assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+ assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+ tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+ for (int i = 0; i < numKafkaServers; i++) {
+ File tmpDir = new File(tmpKafkaParent, "server-" + i);
+ assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+ tmpKafkaDirs.add(tmpDir);
+ }
+
+ zookeeper = null;
+ brokers = null;
+
+ try {
+ LOG.info("Starting Zookeeper");
+ zookeeper = new TestingServer(-1, tmpZkDir);
+ zookeeperConnectionString = zookeeper.getConnectString();
+ LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
+
+ LOG.info("Starting KafkaServer");
+ brokers = new ArrayList<>(numKafkaServers);
+
+ for (int i = 0; i < numKafkaServers; i++) {
+ brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+
+ SocketServer socketServer = brokers.get(i).socketServer();
+ if(secureMode) {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+ } else {
+ brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+ }
+ }
+
+ LOG.info("ZK and KafkaServer started.");
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ fail("Test setup failed: " + t.getMessage());
+ }
+
+ LOG.info("brokerConnectionString --> {}", brokerConnectionString);
+
+ standardProps = new Properties();
+ standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+ standardProps.setProperty("group.id", "flink-tests");
+ standardProps.setProperty("enable.auto.commit", "false");
+ standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+ standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+ standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
+ standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+ }
+
+ @Override
+ public void shutdown() {
+ for (KafkaServer broker : brokers) {
+ if (broker != null) {
+ broker.shutdown();
+ }
+ }
+ brokers.clear();
+
+ if (zookeeper != null) {
+ try {
+ zookeeper.stop();
+ zookeeper.close();
+ }
+ catch (Exception e) {
+ LOG.warn("ZK.stop() failed", e);
+ }
+ zookeeper = null;
+ }
+
+ // clean up the temp spaces
+
+ if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpKafkaParent);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ if (tmpZkDir != null && tmpZkDir.exists()) {
+ try {
+ FileUtils.deleteDirectory(tmpZkDir);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ public ZkUtils getZkUtils() {
+ LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
+ ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+ Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+ return ZkUtils.apply(creator, false);
+ }
+
+ @Override
+ public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+ // create topic with one client
+ LOG.info("Creating topic {}", topic);
+
+ ZkUtils zkUtils = getZkUtils();
+ try {
+ AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig);
+ } finally {
+ zkUtils.close();
+ }
+
+ LOG.info("Topic {} create request is successfully posted", topic);
+
+ // validate that the topic has been created
+ final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
+ do {
+ try {
+ if(secureMode) {
+ //increase wait time since in Travis ZK timeout occurs frequently
+ int wait = Integer.parseInt(zkTimeout) / 100;
+ LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+ Thread.sleep(wait);
+ } else {
+ Thread.sleep(100);
+ }
+
+ } catch (InterruptedException e) {
+ // restore interrupted state
+ }
+ // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+ // not always correct.
+
+ LOG.info("Validating if the topic {} has been created or not", topic);
+
+ // create a new ZK utils connection
+ ZkUtils checkZKConn = getZkUtils();
+ if(AdminUtils.topicExists(checkZKConn, topic)) {
+ LOG.info("topic {} has been created successfully", topic);
+ checkZKConn.close();
+ return;
+ }
+ LOG.info("topic {} has not been created yet. Will check again...", topic);
+ checkZKConn.close();
+ }
+ while (System.currentTimeMillis() < deadline);
+ fail("Test topic could not be created");
+ }
+
+ @Override
+ public void deleteTestTopic(String topic) {
+ ZkUtils zkUtils = getZkUtils();
+ try {
+ LOG.info("Deleting topic {}", topic);
+
+ ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+ Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+ AdminUtils.deleteTopic(zkUtils, topic);
+
+ zk.close();
+ } finally {
+ zkUtils.close();
+ }
+ }
+
+ /**
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ */
+ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+ Properties kafkaProperties = new Properties();
+
+ // properties have to be Strings
+ kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+ kafkaProperties.put("broker.id", Integer.toString(brokerId));
+ kafkaProperties.put("log.dir", tmpFolder.toString());
+ kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+ // for CI stability, increase zookeeper session timeout
+ kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+ kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
+ if(additionalServerProperties != null) {
+ kafkaProperties.putAll(additionalServerProperties);
+ }
+
+ final int numTries = 5;
+
+ for (int i = 1; i <= numTries; i++) {
+ int kafkaPort = NetUtils.getAvailablePort();
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+ //to support secure kafka cluster
+ if(secureMode) {
+ LOG.info("Adding Kafka secure configurations");
+ kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.putAll(getSecureProperties());
+ }
+
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+ try {
+ scala.Option<String> stringNone = scala.Option.apply(null);
+ KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+ server.startup();
+ return server;
+ }
+ catch (KafkaException e) {
+ if (e.getCause() instanceof BindException) {
+ // port conflict, retry...
+ LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+ }
+ else {
+ throw e;
+ }
+ }
+ }
+
+ throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+ }
+
+ public Properties getSecureProperties() {
+ Properties prop = new Properties();
+ if(secureMode) {
+ prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+ prop.put("security.protocol", "SASL_PLAINTEXT");
+ prop.put("sasl.kerberos.service.name", "kafka");
+
+ //add special timeout for Travis
+ prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
+ prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
+ prop.setProperty("metadata.fetch.timeout.ms","120000");
+ }
+ return prop;
+ }
+
+ private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+ private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+ public KafkaOffsetHandlerImpl(Properties props) {
+ offsetClient = new KafkaConsumer<>(props);
+ }
+
+ @Override
+ public Long getCommittedOffset(String topicName, int partition) {
+ OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+ return (committed != null) ? committed.offset() : null;
+ }
+
+ @Override
+ public void close() {
+ offsetClient.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread.
+ */
+public class HandoverTest {
+
+ // ------------------------------------------------------------------------
+ // test produce / consumer
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testWithVariableProducer() throws Exception {
+ runProducerConsumerTest(500, 2, 0);
+ }
+
+ @Test
+ public void testWithVariableConsumer() throws Exception {
+ runProducerConsumerTest(500, 0, 2);
+ }
+
+ @Test
+ public void testWithVariableBoth() throws Exception {
+ runProducerConsumerTest(500, 2, 2);
+ }
+
+ // ------------------------------------------------------------------------
+ // test error propagation
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testPublishErrorOnEmptyHandover() throws Exception {
+ final Handover handover = new Handover();
+
+ Exception error = new Exception();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testPublishErrorOnFullHandover() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ IOException error = new IOException();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnEmpty() throws Exception {
+ final Handover handover = new Handover();
+
+ IllegalStateException error = new IllegalStateException();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnFull() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ LinkageError error = new LinkageError();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test closing behavior
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCloseEmptyForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseEmptyForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test wake up behavior
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+ Handover handover = new Handover();
+ handover.wakeupProducer();
+
+ // produce into a woken but empty handover
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+
+ // handover now has records, next time we wakeup and produce it needs
+ // to throw an exception
+ handover.wakeupProducer();
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.WakeupException e) {
+ // expected
+ }
+
+ // empty the handover
+ assertNotNull(handover.pollNext());
+
+ // producing into an empty handover should work
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWakeupWakesOnlyOnce() throws Exception {
+ // create a full handover
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ handover.wakeupProducer();
+
+ try {
+ handover.produce(createTestRecords());
+ fail();
+ } catch (WakeupException e) {
+ // expected
+ }
+
+ CheckedThread producer = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ handover.produce(createTestRecords());
+ }
+ };
+ producer.start();
+
+ // the producer must go blocking
+ producer.waitUntilThreadHoldsLock(10000);
+
+ // release the thread by consuming something
+ assertNotNull(handover.pollNext());
+ producer.sync();
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
+ // generate test data
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
+ for (int i = 0; i < numRecords; i++) {
+ data[i] = createTestRecords();
+ }
+
+ final Handover handover = new Handover();
+
+ ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
+ ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
+
+ consumer.start();
+ producer.start();
+
+ // sync first on the consumer, so it propagates assertion errors
+ consumer.sync();
+ producer.sync();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+ return mock(ConsumerRecords.class);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static abstract class CheckedThread extends Thread {
+
+ private volatile Throwable error;
+
+ public abstract void go() throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ go();
+ }
+ catch (Throwable t) {
+ error = t;
+ }
+ }
+
+ public void sync() throws Exception {
+ join();
+ if (error != null) {
+ ExceptionUtils.rethrowException(error, error.getMessage());
+ }
+ }
+
+ public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
+ final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+
+ while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
+ Thread.sleep(1);
+ }
+
+ if (!isBlockedOrWaiting()) {
+ throw new TimeoutException();
+ }
+ }
+
+ private boolean isBlockedOrWaiting() {
+ State state = getState();
+ return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
+ }
+ }
+
+ private static class ProducerThread extends CheckedThread {
+
+ private final Random rnd = new Random();
+ private final Handover handover;
+ private final ConsumerRecords<byte[], byte[]>[] data;
+ private final int maxDelay;
+
+ private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+ this.handover = handover;
+ this.data = data;
+ this.maxDelay = maxDelay;
+ }
+
+ @Override
+ public void go() throws Exception {
+ for (ConsumerRecords<byte[], byte[]> rec : data) {
+ handover.produce(rec);
+
+ if (maxDelay > 0) {
+ int delay = rnd.nextInt(maxDelay);
+ Thread.sleep(delay);
+ }
+ }
+ }
+ }
+
+ private static class ConsumerThread extends CheckedThread {
+
+ private final Random rnd = new Random();
+ private final Handover handover;
+ private final ConsumerRecords<byte[], byte[]>[] data;
+ private final int maxDelay;
+
+ private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+ this.handover = handover;
+ this.data = data;
+ this.maxDelay = maxDelay;
+ }
+
+ @Override
+ public void go() throws Exception {
+ for (ConsumerRecords<byte[], byte[]> rec : data) {
+ ConsumerRecords<byte[], byte[]> next = handover.pollNext();
+
+ assertEquals(rec, next);
+
+ if (maxDelay > 0) {
+ int delay = rnd.nextInt(maxDelay);
+ Thread.sleep(delay);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4ac1773
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file