You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:59 UTC
[09/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
deleted file mode 100644
index d495327..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
- *
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
-
- private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
-
- // ------------------------------------------------------------------------
-
- /** The schema to convert between Kafka's byte messages, and Flink's objects */
- private final KeyedDeserializationSchema<T> deserializer;
-
- /** The handover of data and exceptions between the consumer thread and the task thread */
- private final Handover handover;
-
- /** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */
- private final KafkaConsumerThread consumerThread;
-
- /** Flag to mark the main work loop as alive */
- private volatile boolean running = true;
-
- // ------------------------------------------------------------------------
-
- public Kafka09Fetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> assignedPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- ProcessingTimeService processingTimeProvider,
- long autoWatermarkInterval,
- ClassLoader userCodeClassLoader,
- boolean enableCheckpointing,
- String taskNameWithSubtasks,
- MetricGroup metricGroup,
- KeyedDeserializationSchema<T> deserializer,
- Properties kafkaProperties,
- long pollTimeout,
- boolean useMetrics) throws Exception
- {
- super(
- sourceContext,
- assignedPartitions,
- watermarksPeriodic,
- watermarksPunctuated,
- processingTimeProvider,
- autoWatermarkInterval,
- userCodeClassLoader,
- useMetrics);
-
- this.deserializer = deserializer;
- this.handover = new Handover();
-
- final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
- addOffsetStateGauge(kafkaMetricGroup);
-
- // if checkpointing is enabled, we are not automatically committing to Kafka.
- kafkaProperties.setProperty(
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
- Boolean.toString(!enableCheckpointing));
-
- this.consumerThread = new KafkaConsumerThread(
- LOG,
- handover,
- kafkaProperties,
- subscribedPartitions(),
- kafkaMetricGroup,
- createCallBridge(),
- getFetcherName() + " for " + taskNameWithSubtasks,
- pollTimeout,
- useMetrics);
- }
-
- // ------------------------------------------------------------------------
- // Fetcher work methods
- // ------------------------------------------------------------------------
-
- @Override
- public void runFetchLoop() throws Exception {
- try {
- final Handover handover = this.handover;
-
- // kick off the actual Kafka consumer
- consumerThread.start();
-
- while (running) {
- // this blocks until we get the next records
- // it automatically re-throws exceptions encountered in the fetcher thread
- final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
-
- // get the records for each topic partition
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
-
- List<ConsumerRecord<byte[], byte[]>> partitionRecords =
- records.records(partition.getKafkaPartitionHandle());
-
- for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-
- final T value = deserializer.deserialize(
- record.key(), record.value(),
- record.topic(), record.partition(), record.offset());
-
- if (deserializer.isEndOfStream(value)) {
- // end of stream signaled
- running = false;
- break;
- }
-
- // emit the actual record. this also updates offset state atomically
- // and deals with timestamps and watermark generation
- emitRecord(value, partition, record.offset(), record);
- }
- }
- }
- }
- finally {
- // this signals the consumer thread that no more work is to be done
- consumerThread.shutdown();
- }
-
- // on a clean exit, wait for the runner thread
- try {
- consumerThread.join();
- }
- catch (InterruptedException e) {
- // may be the result of a wake-up interruption after an exception.
- // we ignore this here and only restore the interruption state
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void cancel() {
- // flag the main thread to exit. A thread interrupt will come anyways.
- running = false;
- handover.close();
- consumerThread.shutdown();
- }
-
- // ------------------------------------------------------------------------
- // The below methods are overridden in the 0.10 fetcher, which otherwise
- // reuses most of the 0.9 fetcher behavior
- // ------------------------------------------------------------------------
-
- protected void emitRecord(
- T record,
- KafkaTopicPartitionState<TopicPartition> partition,
- long offset,
- @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception {
-
- // the 0.9 Fetcher does not try to extract a timestamp
- emitRecord(record, partition, offset);
- }
-
- /**
- * Gets the name of this fetcher, for thread naming and logging purposes.
- */
- protected String getFetcherName() {
- return "Kafka 0.9 Fetcher";
- }
-
- protected KafkaConsumerCallBridge createCallBridge() {
- return new KafkaConsumerCallBridge();
- }
-
- // ------------------------------------------------------------------------
- // Implement Methods of the AbstractFetcher
- // ------------------------------------------------------------------------
-
- @Override
- public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
- return new TopicPartition(partition.getTopic(), partition.getPartition());
- }
-
- @Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
- KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
- Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
-
- for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
- Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
- if (lastProcessedOffset != null) {
- // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
- // This does not affect Flink's checkpoints/saved state.
- long offsetToCommit = lastProcessedOffset + 1;
-
- offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
- partition.setCommittedOffset(offsetToCommit);
- }
- }
-
- // record the work to be committed by the main consumer thread and make sure the consumer notices that
- consumerThread.setOffsetsToCommit(offsetsToCommit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
deleted file mode 100644
index c17aae6..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-/**
- * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
- *
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
- * for example changing {@code assign(List)} to {@code assign(Collection)}.
- *
- * Because of that, we need to two versions whose compiled code goes against different method signatures.
- * Even though the source of subclasses may look identical, the byte code will be different, because they
- * are compiled against different dependencies.
- */
-public class KafkaConsumerCallBridge {
-
- public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception {
- consumer.assign(topicPartitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
deleted file mode 100644
index 9cfa840..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
- * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
- * deserialize and emit the records.
- *
- * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
- * The Kafka consumer code was found to not always handle interrupts well, and to even
- * deadlock in certain situations.
- *
- * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
- * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
- * to the KafkaConsumer calls that change signature.
- */
-public class KafkaConsumerThread extends Thread {
-
- /** Logger for this consumer */
- private final Logger log;
-
- /** The handover of data and exceptions between the consumer thread and the task thread */
- private final Handover handover;
-
- /** The next offsets that the main thread should commit */
- private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
-
- /** The configuration for the Kafka consumer */
- private final Properties kafkaProperties;
-
- /** The partitions that this consumer reads from */
- private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
-
- /** We get this from the outside to publish metrics. **/
- private final MetricGroup kafkaMetricGroup;
-
- /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */
- private final KafkaConsumerCallBridge consumerCallBridge;
-
- /** The maximum number of milliseconds to wait for a fetch batch */
- private final long pollTimeout;
-
- /** Flag whether to add Kafka's metrics to the Flink metrics */
- private final boolean useMetrics;
-
- /** Reference to the Kafka consumer, once it is created */
- private volatile KafkaConsumer<byte[], byte[]> consumer;
-
- /** Flag to mark the main work loop as alive */
- private volatile boolean running;
-
- /** Flag tracking whether the latest commit request has completed */
- private volatile boolean commitInProgress;
-
-
- public KafkaConsumerThread(
- Logger log,
- Handover handover,
- Properties kafkaProperties,
- KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions,
- MetricGroup kafkaMetricGroup,
- KafkaConsumerCallBridge consumerCallBridge,
- String threadName,
- long pollTimeout,
- boolean useMetrics) {
-
- super(threadName);
- setDaemon(true);
-
- this.log = checkNotNull(log);
- this.handover = checkNotNull(handover);
- this.kafkaProperties = checkNotNull(kafkaProperties);
- this.subscribedPartitions = checkNotNull(subscribedPartitions);
- this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
- this.consumerCallBridge = checkNotNull(consumerCallBridge);
- this.pollTimeout = pollTimeout;
- this.useMetrics = useMetrics;
-
- this.nextOffsetsToCommit = new AtomicReference<>();
- this.running = true;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void run() {
- // early exit check
- if (!running) {
- return;
- }
-
- // this is the means to talk to FlinkKafkaConsumer's main thread
- final Handover handover = this.handover;
-
- // This method initializes the KafkaConsumer and guarantees it is torn down properly.
- // This is important, because the consumer has multi-threading issues,
- // including concurrent 'close()' calls.
- final KafkaConsumer<byte[], byte[]> consumer;
- try {
- consumer = new KafkaConsumer<>(kafkaProperties);
- }
- catch (Throwable t) {
- handover.reportError(t);
- return;
- }
-
- // from here on, the consumer is guaranteed to be closed properly
- try {
- // The callback invoked by Kafka once an offset commit is complete
- final OffsetCommitCallback offsetCommitCallback = new CommitCallback();
-
- // tell the consumer which partitions to work with
- consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions));
-
- // register Kafka's very own metrics in Flink's metric reporters
- if (useMetrics) {
- // register Kafka metrics to Flink
- Map<MetricName, ? extends Metric> metrics = consumer.metrics();
- if (metrics == null) {
- // MapR's Kafka implementation returns null here.
- log.info("Consumer implementation does not support metrics");
- } else {
- // we have Kafka metrics, register them
- for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
- kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
- }
- }
- }
-
- // early exit check
- if (!running) {
- return;
- }
-
- // seek the consumer to the initial offsets
- for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) {
- if (partition.isOffsetDefined()) {
- log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " +
- "seeking the consumer to position {}",
- partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
-
- consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
- }
- else {
- // for partitions that do not have offsets restored from a checkpoint/savepoint,
- // we need to define our internal offset state for them using the initial offsets retrieved from Kafka
- // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
-
- long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
-
- log.info("Partition {} has no initial offset; the consumer has position {}, " +
- "so the initial offset will be set to {}",
- partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
- // the fetched offset represents the next record to process, so we need to subtract it by 1
- partition.setOffset(fetchedOffset - 1);
- }
- }
-
- // from now on, external operations may call the consumer
- this.consumer = consumer;
-
- // the latest bulk of records. may carry across the loop if the thread is woken up
- // from blocking on the handover
- ConsumerRecords<byte[], byte[]> records = null;
-
- // main fetch loop
- while (running) {
-
- // check if there is something to commit
- if (!commitInProgress) {
- // get and reset the work-to-be committed, so we don't repeatedly commit the same
- final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
-
- if (toCommit != null) {
- log.debug("Sending async offset commit request to Kafka broker");
-
- // also record that a commit is already in progress
- // the order here matters! first set the flag, then send the commit command.
- commitInProgress = true;
- consumer.commitAsync(toCommit, offsetCommitCallback);
- }
- }
-
- // get the next batch of records, unless we did not manage to hand the old batch over
- if (records == null) {
- try {
- records = consumer.poll(pollTimeout);
- }
- catch (WakeupException we) {
- continue;
- }
- }
-
- try {
- handover.produce(records);
- records = null;
- }
- catch (Handover.WakeupException e) {
- // fall through the loop
- }
- }
- // end main fetch loop
- }
- catch (Throwable t) {
- // let the main thread know and exit
- // it may be that this exception comes because the main thread closed the handover, in
- // which case the below reporting is irrelevant, but does not hurt either
- handover.reportError(t);
- }
- finally {
- // make sure the handover is closed if it is not already closed or has an error
- handover.close();
-
- // make sure the KafkaConsumer is closed
- try {
- consumer.close();
- }
- catch (Throwable t) {
- log.warn("Error while closing Kafka consumer", t);
- }
- }
- }
-
- /**
- * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
- */
- public void shutdown() {
- running = false;
-
- // We cannot call close() on the KafkaConsumer, because it will actually throw
- // an exception if a concurrent call is in progress
-
- // this wakes up the consumer if it is blocked handing over records
- handover.wakeupProducer();
-
- // this wakes up the consumer if it is blocked in a kafka poll
- if (consumer != null) {
- consumer.wakeup();
- }
- }
-
- /**
- * Tells this thread to commit a set of offsets. This method does not block, the committing
- * operation will happen asynchronously.
- *
- * <p>Only one commit operation may be pending at any time. If the committing takes longer than
- * the frequency with which this method is called, then some commits may be skipped due to being
- * superseded by newer ones.
- *
- * @param offsetsToCommit The offsets to commit
- */
- public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
- // record the work to be committed by the main consumer thread and make sure the consumer notices that
- if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
- log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
- "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
- "This does not compromise Flink's checkpoint integrity.");
- }
-
- // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
- handover.wakeupProducer();
- if (consumer != null) {
- consumer.wakeup();
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
- ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
- for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
- result.add(p.getKafkaPartitionHandle());
- }
- return result;
- }
-
- // ------------------------------------------------------------------------
-
- private class CommitCallback implements OffsetCommitCallback {
-
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
- commitInProgress = false;
-
- if (ex != null) {
- log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
deleted file mode 100644
index 7a82365..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internal.Handover;
-import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-/**
- * Unit tests for the {@link Kafka09Fetcher}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConsumerThread.class)
-public class Kafka09FetcherTest {
-
- @Test
- public void testCommitDoesNotBlock() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
- final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
- testCommitData.put(testPartition, 11L);
-
- // to synchronize when the consumer is in its blocking method
- final OneShotLatch sync = new OneShotLatch();
-
- // ----- the mock consumer with blocking poll calls ----
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
-
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- sync.trigger();
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- true, /* checkpointing */
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the fetcher has reached the method of interest
- sync.await();
-
- // ----- trigger the offset commit -----
-
- final AtomicReference<Throwable> commitError = new AtomicReference<>();
- final Thread committer = new Thread("committer runner") {
- @Override
- public void run() {
- try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
- } catch (Throwable t) {
- commitError.set(t);
- }
- }
- };
- committer.start();
-
- // ----- ensure that the committer finishes in time -----
- committer.join(30000);
- assertFalse("The committer did not finish in time", committer.isAlive());
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable fetcherError = error.get();
- if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", fetcherError);
- }
- final Throwable committerError = commitError.get();
- if (committerError != null) {
- throw new Exception("Exception in the committer", committerError);
- }
- }
-
- @Test
- public void ensureOffsetsGetCommitted() throws Exception {
-
- // test data
- final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
- final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
-
- final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
- testCommitData1.put(testPartition1, 11L);
- testCommitData1.put(testPartition2, 18L);
-
- final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
- testCommitData2.put(testPartition1, 19L);
- testCommitData2.put(testPartition2, 28L);
-
- final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
-
-
- // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
-
- final MultiShotLatch blockerLatch = new MultiShotLatch();
-
- KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
-
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
- blockerLatch.await();
- return ConsumerRecords.empty();
- }
- });
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- blockerLatch.trigger();
- return null;
- }
- }).when(mockConsumer).wakeup();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) {
- @SuppressWarnings("unchecked")
- Map<TopicPartition, OffsetAndMetadata> offsets =
- (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
-
- OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
-
- commitStore.add(offsets);
- callback.onComplete(offsets, null);
-
- return null;
- }
- }).when(mockConsumer).commitAsync(
- Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
-
- // make sure the fetcher creates the mock consumer
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- create the test fetcher -----
-
- @SuppressWarnings("unchecked")
- SourceContext<String> sourceContext = mock(SourceContext.class);
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- true, /* checkpointing */
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // ----- trigger the first offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
- Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(12L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(17L, entry.getValue().offset());
- }
- }
-
- // ----- trigger the second offset commit -----
-
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
- Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
-
- for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
- TopicPartition partition = entry.getKey();
- if (partition.topic().equals("test")) {
- assertEquals(42, partition.partition());
- assertEquals(20L, entry.getValue().offset());
- }
- else if (partition.topic().equals("another")) {
- assertEquals(99, partition.partition());
- assertEquals(27L, entry.getValue().offset());
- }
- }
-
- // ----- test done, wait till the fetcher is done for a clean shutdown -----
- fetcher.cancel();
- fetcherRunner.join();
-
- // check that there were no errors in the fetcher
- final Throwable caughtError = error.get();
- if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) {
- throw new Exception("Exception in the fetcher", caughtError);
- }
- }
-
- @Test
- public void testCancellationWhenEmitBlocks() throws Exception {
-
- // ----- some test data -----
-
- final String topic = "test-topic";
- final int partition = 3;
- final byte[] payload = new byte[] {1, 2, 3, 4};
-
- final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
- new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
- new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
-
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
- data.put(new TopicPartition(topic, partition), records);
-
- final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
-
- // ----- the test consumer -----
-
- final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
- when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
- @Override
- public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
- return consumerRecords;
- }
- });
-
- whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
-
- // ----- build a fetcher -----
-
- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
- List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
- KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-
- final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
- sourceContext,
- topics,
- null, /* periodic watermark extractor */
- null, /* punctuated watermark extractor */
- new TestProcessingTimeService(),
- 10, /* watermark interval */
- this.getClass().getClassLoader(),
- true, /* checkpointing */
- "task_name",
- new UnregisteredMetricsGroup(),
- schema,
- new Properties(),
- 0L,
- false);
-
-
- // ----- run the fetcher -----
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final Thread fetcherRunner = new Thread("fetcher runner") {
-
- @Override
- public void run() {
- try {
- fetcher.runFetchLoop();
- } catch (Throwable t) {
- error.set(t);
- }
- }
- };
- fetcherRunner.start();
-
- // wait until the thread started to emit records to the source context
- sourceContext.waitTillHasBlocker();
-
- // now we try to cancel the fetcher, including the interruption usually done on the task thread
- // once it has finished, there must be no more thread blocked on the source context
- fetcher.cancel();
- fetcherRunner.interrupt();
- fetcherRunner.join();
-
- assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
- }
-
- // ------------------------------------------------------------------------
- // test utilities
- // ------------------------------------------------------------------------
-
- private static final class BlockingSourceContext<T> implements SourceContext<T> {
-
- private final ReentrantLock lock = new ReentrantLock();
- private final OneShotLatch inBlocking = new OneShotLatch();
-
- @Override
- public void collect(T element) {
- block();
- }
-
- @Override
- public void collectWithTimestamp(T element, long timestamp) {
- block();
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- block();
- }
-
- @Override
- public Object getCheckpointLock() {
- return new Object();
- }
-
- @Override
- public void close() {}
-
- public void waitTillHasBlocker() throws InterruptedException {
- inBlocking.await();
- }
-
- public boolean isStillBlocking() {
- return lock.isLocked();
- }
-
- @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"})
- private void block() {
- lock.lock();
- try {
- inBlocking.trigger();
-
- // put this thread to sleep indefinitely
- final Object o = new Object();
- while (true) {
- synchronized (o) {
- o.wait();
- }
- }
- }
- catch (InterruptedException e) {
- // exit cleanly, simply reset the interruption flag
- Thread.currentThread().interrupt();
- }
- finally {
- lock.unlock();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
deleted file mode 100644
index d18e2a9..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-public class Kafka09ITCase extends KafkaConsumerTestBase {
-
- // ------------------------------------------------------------------------
- // Suite of Tests
- // ------------------------------------------------------------------------
-
- @Test(timeout = 60000)
- public void testFailOnNoBroker() throws Exception {
- runFailOnNoBrokerTest();
- }
-
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
- runSimpleConcurrentProducerConsumerTopology();
- }
-
-
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
- runKeyValueTest();
- }
-
- // --- canceling / failures ---
-
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
- runCancelingOnEmptyInputTest();
- }
-
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
- runCancelingOnFullInputTest();
- }
-
- @Test(timeout = 60000)
- public void testFailOnDeploy() throws Exception {
- runFailOnDeployTest();
- }
-
-
- // --- source to partition mappings and exactly once ---
-
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
- runOneToOneExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
- runOneSourceMultiplePartitionsExactlyOnceTest();
- }
-
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
- runMultipleSourcesOnePartitionExactlyOnceTest();
- }
-
- // --- broker failure ---
-
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
- runBrokerFailureTest();
- }
-
- // --- special executions ---
-
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
- runBigRecordTestTopology();
- }
-
- @Test(timeout = 60000)
- public void testMultipleTopics() throws Exception {
- runProduceConsumeMultipleTopics();
- }
-
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
- runAllDeletesTest();
- }
-
- @Test(timeout = 60000)
- public void testEndOfStream() throws Exception {
- runEndOfStreamTest();
- }
-
- @Test(timeout = 60000)
- public void testMetrics() throws Throwable {
- runMetricsTest();
- }
-
- // --- offset committing ---
-
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
- runCommitOffsetsToKafka();
- }
-
- @Test(timeout = 60000)
- public void testStartFromKafkaCommitOffsets() throws Exception {
- runStartFromKafkaCommitOffsets();
- }
-
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
- runAutoOffsetRetrievalAndCommitToKafka();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
deleted file mode 100644
index 45f70ac..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
-
- @Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka09JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema<Row> getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
deleted file mode 100644
index 4a75f50..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
-
- @Override
- protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
- return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
- return (Class) JsonRowDeserializationSchema.class;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
- return (Class) FlinkKafkaConsumer09.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
deleted file mode 100644
index ae4f5b2..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
deleted file mode 100644
index e748537..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/*
- * Kafka Secure Connection (kerberos) IT test case
- */
-public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
-
- protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
-
- @BeforeClass
- public static void prepare() throws IOException, ClassNotFoundException {
- LOG.info("-------------------------------------------------------------------------");
- LOG.info(" Starting Kafka09SecuredRunITCase ");
- LOG.info("-------------------------------------------------------------------------");
-
- SecureTestEnvironment.prepare(tempFolder);
- SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
-
- startClusters(true);
- }
-
- @AfterClass
- public static void shutDownServices() {
- shutdownClusters();
- SecureTestEnvironment.cleanup();
- }
-
-
- //timeout interval is large since in Travis, ZK connection timeout occurs frequently
- //The timeout for the test case is 2 times timeout of ZK connection
- @Test(timeout = 600000)
- public void testMultipleTopics() throws Exception {
- runProduceConsumeMultipleTopics();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 18b2aec..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPropagateExceptions() {
- try {
- // mock kafka producer
- KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-
- // partition setup
- when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
- // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour
- Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
- // failure when trying to send an element
- when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
- .thenAnswer(new Answer<Future<RecordMetadata>>() {
- @Override
- public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
- Callback callback = (Callback) invocation.getArguments()[1];
- callback.onCompletion(null, new Exception("Test error"));
- return null;
- }
- });
-
- // make sure the FlinkKafkaProducer instantiates our mock producer
- whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-
- // (1) producer that propagates errors
-
- FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
-
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
-
- testHarness.open();
-
- try {
- testHarness.processElement(new StreamRecord<>("value"));
- testHarness.processElement(new StreamRecord<>("value"));
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertNotNull(e.getCause());
- assertNotNull(e.getCause().getMessage());
- assertTrue(e.getCause().getMessage().contains("Test error"));
- }
-
- // (2) producer that only logs errors
-
- FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
- producerLogging.setLogFailuresOnly(true);
-
- testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>("value"));
- testHarness.processElement(new StreamRecord<>("value"));
-
- testHarness.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
deleted file mode 100644
index 1802e0c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.api.PartitionMetadata;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import java.io.File;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * An implementation of the KafkaServerProvider for Kafka 0.9
- */
-public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
-
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
- private File tmpZkDir;
- private File tmpKafkaParent;
- private List<File> tmpKafkaDirs;
- private List<KafkaServer> brokers;
- private TestingServer zookeeper;
- private String zookeeperConnectionString;
- private String brokerConnectionString = "";
- private Properties standardProps;
- private Properties additionalServerProperties;
- private boolean secureMode = false;
- // 6 seconds is default. Seems to be too small for travis. 30 seconds
- private String zkTimeout = "30000";
-
- public String getBrokerConnectionString() {
- return brokerConnectionString;
- }
-
- @Override
- public Properties getStandardProperties() {
- return standardProps;
- }
-
- @Override
- public String getVersion() {
- return "0.9";
- }
-
- @Override
- public List<KafkaServer> getBrokers() {
- return brokers;
- }
-
- @Override
- public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
- return new FlinkKafkaConsumer09<>(topics, readSchema, props);
- }
-
- @Override
- public <T> StreamSink<T> getProducerSink(
- String topic,
- KeyedSerializationSchema<T> serSchema,
- Properties props,
- KafkaPartitioner<T> partitioner) {
- FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
- prod.setFlushOnCheckpoint(true);
- return new StreamSink<>(prod);
- }
-
- @Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
- FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
- prod.setFlushOnCheckpoint(true);
- return stream.addSink(prod);
- }
-
- @Override
- public KafkaOffsetHandler createOffsetHandler(Properties props) {
- return new KafkaOffsetHandlerImpl(props);
- }
-
- @Override
- public void restartBroker(int leaderId) throws Exception {
- brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
- }
-
- @Override
- public int getLeaderToShutDown(String topic) throws Exception {
- ZkUtils zkUtils = getZkUtils();
- try {
- PartitionMetadata firstPart = null;
- do {
- if (firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
-
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
- firstPart = partitionMetadata.head();
- }
- while (firstPart.errorCode() != 0);
-
- return firstPart.leader().get().id();
- } finally {
- zkUtils.close();
- }
- }
-
- @Override
- public int getBrokerId(KafkaServer server) {
- return server.config().brokerId();
- }
-
- @Override
- public boolean isSecureRunSupported() {
- return true;
- }
-
- @Override
- public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
-
- //increase the timeout since in Travis ZK connection takes long time for secure connection.
- if(secureMode) {
- //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
- numKafkaServers = 1;
- zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
- }
-
- this.additionalServerProperties = additionalServerProperties;
- this.secureMode = secureMode;
- File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
- tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
- assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
- assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
- tmpKafkaDirs = new ArrayList<>(numKafkaServers);
- for (int i = 0; i < numKafkaServers; i++) {
- File tmpDir = new File(tmpKafkaParent, "server-" + i);
- assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
- tmpKafkaDirs.add(tmpDir);
- }
-
- zookeeper = null;
- brokers = null;
-
- try {
- LOG.info("Starting Zookeeper");
- zookeeper = new TestingServer(-1, tmpZkDir);
- zookeeperConnectionString = zookeeper.getConnectString();
- LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);
-
- LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(numKafkaServers);
-
- for (int i = 0; i < numKafkaServers; i++) {
- brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-
- SocketServer socketServer = brokers.get(i).socketServer();
- if(secureMode) {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
- } else {
- brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
- }
- }
-
- LOG.info("ZK and KafkaServer started.");
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail("Test setup failed: " + t.getMessage());
- }
-
- LOG.info("brokerConnectionString --> {}", brokerConnectionString);
-
- standardProps = new Properties();
- standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
- standardProps.setProperty("bootstrap.servers", brokerConnectionString);
- standardProps.setProperty("group.id", "flink-tests");
- standardProps.setProperty("enable.auto.commit", "false");
- standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout);
- standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
- standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
- standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-
- }
-
- @Override
- public void shutdown() {
- for (KafkaServer broker : brokers) {
- if (broker != null) {
- broker.shutdown();
- }
- }
- brokers.clear();
-
- if (zookeeper != null) {
- try {
- zookeeper.stop();
- zookeeper.close();
- }
- catch (Exception e) {
- LOG.warn("ZK.stop() failed", e);
- }
- zookeeper = null;
- }
-
- // clean up the temp spaces
-
- if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
- try {
- FileUtils.deleteDirectory(tmpKafkaParent);
- }
- catch (Exception e) {
- // ignore
- }
- }
- if (tmpZkDir != null && tmpZkDir.exists()) {
- try {
- FileUtils.deleteDirectory(tmpZkDir);
- }
- catch (Exception e) {
- // ignore
- }
- }
- }
-
- public ZkUtils getZkUtils() {
- LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString);
- ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
- Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
- return ZkUtils.apply(creator, false);
- }
-
- @Override
- public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
- // create topic with one client
- LOG.info("Creating topic {}", topic);
-
- ZkUtils zkUtils = getZkUtils();
- try {
- AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig);
- } finally {
- zkUtils.close();
- }
-
- LOG.info("Topic {} create request is successfully posted", topic);
-
- // validate that the topic has been created
- final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout);
- do {
- try {
- if(secureMode) {
- //increase wait time since in Travis ZK timeout occurs frequently
- int wait = Integer.parseInt(zkTimeout) / 100;
- LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
- Thread.sleep(wait);
- } else {
- Thread.sleep(100);
- }
-
- } catch (InterruptedException e) {
- // restore interrupted state
- }
- // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
- // not always correct.
-
- LOG.info("Validating if the topic {} has been created or not", topic);
-
- // create a new ZK utils connection
- ZkUtils checkZKConn = getZkUtils();
- if(AdminUtils.topicExists(checkZKConn, topic)) {
- LOG.info("topic {} has been created successfully", topic);
- checkZKConn.close();
- return;
- }
- LOG.info("topic {} has not been created yet. Will check again...", topic);
- checkZKConn.close();
- }
- while (System.currentTimeMillis() < deadline);
- fail("Test topic could not be created");
- }
-
- @Override
- public void deleteTestTopic(String topic) {
- ZkUtils zkUtils = getZkUtils();
- try {
- LOG.info("Deleting topic {}", topic);
-
- ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
- Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
-
- AdminUtils.deleteTopic(zkUtils, topic);
-
- zk.close();
- } finally {
- zkUtils.close();
- }
- }
-
- /**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
- */
- protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
- Properties kafkaProperties = new Properties();
-
- // properties have to be Strings
- kafkaProperties.put("advertised.host.name", KAFKA_HOST);
- kafkaProperties.put("broker.id", Integer.toString(brokerId));
- kafkaProperties.put("log.dir", tmpFolder.toString());
- kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
- kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-
- // for CI stability, increase zookeeper session timeout
- kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
- kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
- if(additionalServerProperties != null) {
- kafkaProperties.putAll(additionalServerProperties);
- }
-
- final int numTries = 5;
-
- for (int i = 1; i <= numTries; i++) {
- int kafkaPort = NetUtils.getAvailablePort();
- kafkaProperties.put("port", Integer.toString(kafkaPort));
-
- //to support secure kafka cluster
- if(secureMode) {
- LOG.info("Adding Kafka secure configurations");
- kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
- kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
- kafkaProperties.putAll(getSecureProperties());
- }
-
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
- try {
- scala.Option<String> stringNone = scala.Option.apply(null);
- KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
- server.startup();
- return server;
- }
- catch (KafkaException e) {
- if (e.getCause() instanceof BindException) {
- // port conflict, retry...
- LOG.info("Port conflict when starting Kafka Broker. Retrying...");
- }
- else {
- throw e;
- }
- }
- }
-
- throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
- }
-
- public Properties getSecureProperties() {
- Properties prop = new Properties();
- if(secureMode) {
- prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
- prop.put("security.protocol", "SASL_PLAINTEXT");
- prop.put("sasl.kerberos.service.name", "kafka");
-
- //add special timeout for Travis
- prop.setProperty("zookeeper.session.timeout.ms", zkTimeout);
- prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout);
- prop.setProperty("metadata.fetch.timeout.ms","120000");
- }
- return prop;
- }
-
- private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
-
- private final KafkaConsumer<byte[], byte[]> offsetClient;
-
- public KafkaOffsetHandlerImpl(Properties props) {
- offsetClient = new KafkaConsumer<>(props);
- }
-
- @Override
- public Long getCommittedOffset(String topicName, int partition) {
- OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
- return (committed != null) ? committed.offset() : null;
- }
-
- @Override
- public void close() {
- offsetClient.close();
- }
- }
-
-}