You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:26 UTC
[10/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..715f5ee
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Array with the partition ids of the given topicId
+ * The size of this array is the number of partitions
+ */
+ private final int[] partitions;
+
+ /**
+ * User defined properties for the Producer
+ */
+ private final Properties producerConfig;
+
+ /**
+ * The name of the topic this producer is writing data to
+ */
+ private final String topicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with Flink into
+ * byte[] for Kafka.
+ */
+ private final SerializationSchema<IN, byte[]> schema;
+
+ /**
+ * User-provided partitioner for assigning an object to a Kafka partition.
+ */
+ private final KafkaPartitioner partitioner;
+
+ /**
+ * Flag indicating whether to accept failures (and log them), or to fail on failures
+ */
+ private boolean logFailuresOnly;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+
+ /** KafkaProducer instance */
+ private transient KafkaProducer<byte[], byte[]> producer;
+
+ /** The callback than handles error propagation or logging callbacks */
+ private transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here */
+ private transient volatile Exception asyncException;
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ */
+ public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, null);
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+ Preconditions.checkNotNull(topicId, "TopicID not set");
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
+ Preconditions.checkNotNull(producerConfig, "producerConfig not set");
+ ClosureCleaner.ensureSerializable(customPartitioner);
+ ClosureCleaner.ensureSerializable(serializationSchema);
+
+ this.topicId = topicId;
+ this.schema = serializationSchema;
+ this.producerConfig = producerConfig;
+
+ // set the producer configuration properties.
+
+ if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+
+ // create a local KafkaProducer to get the list of partitions.
+ // this will also ensure locally that all required ProducerConfig values are set.
+ try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
+ List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
+
+ this.partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+ getPartitionsProd.close();
+ }
+
+ if (customPartitioner == null) {
+ this.partitioner = new FixedPartitioner();
+ } else {
+ this.partitioner = customPartitioner;
+ }
+ }
+
+ // ---------------------------------- Properties --------------------------
+
+ /**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ @Override
+ public void open(Configuration configuration) {
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+ RuntimeContext ctx = getRuntimeContext();
+ partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+
+ LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
+ ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
+
+ if (logFailuresOnly) {
+ callback = new Callback() {
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null) {
+ LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+ }
+ }
+ };
+ }
+ else {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null && asyncException == null) {
+ asyncException = exception;
+ }
+ }
+ };
+ }
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Kafka.
+ *
+ * @param next
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN next) throws Exception {
+ // propagate asynchronous errors
+ checkErroneous();
+
+ byte[] serialized = schema.serialize(next);
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
+ partitioner.partition(next, partitions.length),
+ null, serialized);
+
+ producer.send(record, callback);
+ }
+
+
+ @Override
+ public void close() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+
+ // make sure we propagate pending errors
+ checkErroneous();
+ }
+
+
+ // ----------------------------------- Utilities --------------------------
+
+ private void checkErroneous() throws Exception {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+ }
+ }
+
+ public static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+ for(String broker: elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
new file mode 100644
index 0000000..f856926
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.api;
+
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+/**
+ * Sink that emits its inputs to a Kafka topic.
+ *
+ * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
+ * This class will be removed in future releases of Flink.
+ */
+@Deprecated
+public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
+ public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+ super(brokerList, topicId, serializationSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
new file mode 100644
index 0000000..869c44f
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.persistent;
+
+import kafka.consumer.ConsumerConfig;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
+ *
+ * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
+ *
+ * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
+ *
+ * @param <T> The type of elements produced by this consumer.
+ */
+@Deprecated
+public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
+
+ private static final long serialVersionUID = -8450689820627198228L;
+
+ /**
+ * Creates a new Kafka 0.8.2.x streaming source consumer.
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param consumerConfig
+ * The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
+ super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
new file mode 100644
index 0000000..4345926
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A fetcher pulls data from Kafka, from a fix set of partitions.
+ * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
+ */
+public interface Fetcher {
+
+ /**
+ * Set which partitions the fetcher should pull from.
+ *
+ * @param partitions The list of partitions for a topic that the fetcher will pull from.
+ */
+ void setPartitionsToRead(List<TopicPartition> partitions);
+
+ /**
+ * Closes the fetcher. This will stop any operation in the
+ * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
+ * close underlying connections and release all resources.
+ */
+ void close() throws IOException;
+
+ /**
+ * Starts fetch data from Kafka and emitting it into the stream.
+ *
+ * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
+ * of the last consumed offset in one atomic operation:</p>
+ * <pre>{@code
+ *
+ * while (running) {
+ * T next = ...
+ * long offset = ...
+ * int partition = ...
+ * synchronized (sourceContext.getCheckpointLock()) {
+ * sourceContext.collect(next);
+ * lastOffsets[partition] = offset;
+ * }
+ * }
+ * }</pre>
+ *
+ * @param sourceContext The source context to emit elements to.
+ * @param valueDeserializer The deserializer to decode the raw values with.
+ * @param lastOffsets The array into which to store the offsets foe which elements are emitted.
+ *
+ * @param <T> The type of elements produced by the fetcher and emitted to the source context.
+ */
+ <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer,
+ long[] lastOffsets) throws Exception;
+
+ /**
+ * Set the next offset to read from for the given partition.
+ * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
+ * will be the message with <i>offset=n</i>.
+ *
+ * @param topicPartition The partition for which to seek the offset.
+ * @param offsetToRead To offset to seek to.
+ */
+ void seek(TopicPartition topicPartition, long offsetToRead);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
new file mode 100644
index 0000000..c4ba103
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of partitions and offsets for a certain topic.
+ *
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
+ */
+public class LegacyFetcher implements Fetcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+ /** The topic from which this fetcher pulls data */
+ private final String topic;
+
+ /** The properties that configure the Kafka connection */
+ private final Properties config;
+
+ /** The task name, to give more readable names to the spawned threads */
+ private final String taskName;
+
+ /** The first error that occurred in a connection thread */
+ private final AtomicReference<Throwable> error;
+
+ /** The partitions that the fetcher should read, with their starting offsets */
+ private Map<TopicPartition, Long> partitionsToRead;
+
+ /** Reference the the thread that executed the run() method. */
+ private volatile Thread mainThread;
+
+ /** Flag to shot the fetcher down */
+ private volatile boolean running = true;
+
+ public LegacyFetcher(String topic, Properties props, String taskName) {
+ this.config = checkNotNull(props, "The config properties cannot be null");
+ this.topic = checkNotNull(topic, "The topic cannot be null");
+ this.taskName = taskName;
+ this.error = new AtomicReference<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // Fetcher methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void setPartitionsToRead(List<TopicPartition> partitions) {
+ partitionsToRead = new HashMap<>(partitions.size());
+ for (TopicPartition tp: partitions) {
+ partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
+ }
+ }
+
+ @Override
+ public void seek(TopicPartition topicPartition, long offsetToRead) {
+ if (partitionsToRead == null) {
+ throw new IllegalArgumentException("No partitions to read set");
+ }
+ if (!partitionsToRead.containsKey(topicPartition)) {
+ throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
+ + ") we are not going to read. Partitions to read " + partitionsToRead);
+ }
+ partitionsToRead.put(topicPartition, offsetToRead);
+ }
+
+ @Override
+ public void close() {
+ // flag needs to be check by the run() method that creates the spawned threads
+ this.running = false;
+
+ // all other cleanup is made by the run method itself
+ }
+
+ @Override
+ public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+ DeserializationSchema<T> valueDeserializer,
+ long[] lastOffsets) throws Exception {
+
+ if (partitionsToRead == null || partitionsToRead.size() == 0) {
+ throw new IllegalArgumentException("No partitions set");
+ }
+
+ // NOTE: This method is needs to always release all resources it acquires
+
+ this.mainThread = Thread.currentThread();
+
+ LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
+
+ // get lead broker for each partition
+
+ // NOTE: The kafka client apparently locks itself in an infinite loop sometimes
+ // when it is interrupted, so we run it only in a separate thread.
+ // since it sometimes refuses to shut down, we resort to the admittedly harsh
+ // means of killing the thread after a timeout.
+ PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
+ infoFetcher.start();
+
+ KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
+ watchDog.start();
+
+ final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
+
+ // brokers to fetch partitions from.
+ int fetchPartitionsCount = 0;
+ Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
+
+ for (PartitionInfo partitionInfo : allPartitionsInTopic) {
+ if (partitionInfo.leader() == null) {
+ throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
+ + " from topic "+partitionInfo.topic()+" because it does not have a leader");
+ }
+
+ for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
+ final TopicPartition topicPartition = entry.getKey();
+ final long offset = entry.getValue();
+
+ // check if that partition is for us
+ if (topicPartition.partition() == partitionInfo.partition()) {
+ List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
+ if (partitions == null) {
+ partitions = new ArrayList<>();
+ fetchBrokers.put(partitionInfo.leader(), partitions);
+ }
+
+ partitions.add(new FetchPartition(topicPartition.partition(), offset));
+ fetchPartitionsCount++;
+
+ }
+ // else this partition is not for us
+ }
+ }
+
+ if (partitionsToRead.size() != fetchPartitionsCount) {
+ throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
+ + fetchPartitionsCount + " partition infos with lead brokers.");
+ }
+
+ // create SimpleConsumers for each broker
+ ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
+
+ for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
+ final Node broker = brokerInfo.getKey();
+ final List<FetchPartition> partitionsList = brokerInfo.getValue();
+
+ FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+ SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
+ broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+
+ thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
+ taskName, broker.id(), broker.host(), broker.port()));
+ thread.setDaemon(true);
+ consumers.add(thread);
+ }
+
+ // last check whether we should abort.
+ if (!running) {
+ return;
+ }
+
+ // start all consumer threads
+ for (SimpleConsumerThread<?> t : consumers) {
+ LOG.info("Starting thread {}", t.getName());
+ t.start();
+ }
+
+ // wait until all consumer threads are done, or until we are aborted, or until
+ // an error occurred in one of the fetcher threads
+ try {
+ boolean someConsumersRunning = true;
+ while (running && error.get() == null && someConsumersRunning) {
+ try {
+ // wait for the consumer threads. if an error occurs, we are interrupted
+ for (SimpleConsumerThread<?> t : consumers) {
+ t.join();
+ }
+
+ // safety net
+ someConsumersRunning = false;
+ for (SimpleConsumerThread<?> t : consumers) {
+ someConsumersRunning |= t.isAlive();
+ }
+ }
+ catch (InterruptedException e) {
+ // ignore. we should notice what happened in the next loop check
+ }
+ }
+
+ // make sure any asynchronous error is noticed
+ Throwable error = this.error.get();
+ if (error != null) {
+ throw new Exception(error.getMessage(), error);
+ }
+ }
+ finally {
+ // make sure that in any case (completion, abort, error), all spawned threads are stopped
+ for (SimpleConsumerThread<?> t : consumers) {
+ if (t.isAlive()) {
+ t.cancel();
+ }
+ }
+ }
+ }
+
+ /**
+ * Reports an error from a fetch thread. This will cause the main thread to see this error,
+ * abort, and cancel all other fetch threads.
+ *
+ * @param error The error to report.
+ */
+ void onErrorInFetchThread(Throwable error) {
+ if (this.error.compareAndSet(null, error)) {
+ // we are the first to report an error
+ if (mainThread != null) {
+ mainThread.interrupt();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Representation of a partition to fetch.
+ */
+ private static class FetchPartition {
+
+ /** ID of the partition within the topic (0 indexed, as given by Kafka) */
+ int partition;
+
+ /** Offset pointing at the next element to read from that partition. */
+ long nextOffsetToRead;
+
+ FetchPartition(int partition, long nextOffsetToRead) {
+ this.partition = partition;
+ this.nextOffsetToRead = nextOffsetToRead;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Per broker fetcher
+ // ------------------------------------------------------------------------
+
+ /**
+ * Each broker needs its separate connection. This thread implements the connection to
+ * one broker. The connection can fetch multiple partitions from the broker.
+ *
+ * @param <T> The data type fetched.
+ */
+ private static class SimpleConsumerThread<T> extends Thread {
+
+ private final SourceFunction.SourceContext<T> sourceContext;
+ private final DeserializationSchema<T> valueDeserializer;
+ private final long[] offsetsState;
+
+ private final FetchPartition[] partitions;
+
+ private final Node broker;
+ private final String topic;
+ private final Properties config;
+
+ private final LegacyFetcher owner;
+
+ private SimpleConsumer consumer;
+
+ private volatile boolean running = true;
+
+
+ // exceptions are thrown locally
+ public SimpleConsumerThread(LegacyFetcher owner,
+ Properties config, String topic,
+ Node broker,
+ FetchPartition[] partitions,
+ SourceFunction.SourceContext<T> sourceContext,
+ DeserializationSchema<T> valueDeserializer,
+ long[] offsetsState) {
+ this.owner = owner;
+ this.config = config;
+ this.topic = topic;
+ this.broker = broker;
+ this.partitions = partitions;
+ this.sourceContext = checkNotNull(sourceContext);
+ this.valueDeserializer = checkNotNull(valueDeserializer);
+ this.offsetsState = checkNotNull(offsetsState);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // set up the config values
+ final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
+
+ // these are the actual configuration values of Kafka + their original default values.
+ final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+ final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+ final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+ final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+ final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+
+ // create the Kafka consumer that we actually use for fetching
+ consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+
+ // make sure that all partitions have some offsets to start with
+ // those partitions that do not have an offset from a checkpoint need to get
+ // their start offset from ZooKeeper
+ {
+ List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
+
+ for (FetchPartition fp : partitions) {
+ if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
+ // retrieve the offset from the consumer
+ partitionsToGetOffsetsFor.add(fp);
+ }
+ }
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+ LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
+ topic, partitionsToGetOffsetsFor);
+ }
+ }
+
+ // Now, the actual work starts :-)
+ int OffsetOutOfRangeCount = 0;
+ while (running) {
+ FetchRequestBuilder frb = new FetchRequestBuilder();
+ frb.clientId(clientId);
+ frb.maxWait(maxWait);
+ frb.minBytes(minBytes);
+
+ for (FetchPartition fp : partitions) {
+ frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
+ }
+ kafka.api.FetchRequest fetchRequest = frb.build();
+ LOG.debug("Issuing fetch request {}", fetchRequest);
+
+ FetchResponse fetchResponse;
+ fetchResponse = consumer.fetch(fetchRequest);
+
+ if (fetchResponse.hasError()) {
+ String exception = "";
+ List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
+ for (FetchPartition fp : partitions) {
+ short code = fetchResponse.errorCode(topic, fp.partition);
+
+ if(code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+ // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
+ partitionsToGetOffsetsFor.add(fp);
+ } else if(code != ErrorMapping.NoError()) {
+ exception += "\nException for partition " + fp.partition + ": " +
+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+ }
+ }
+ if (partitionsToGetOffsetsFor.size() > 0) {
+ // safeguard against an infinite loop.
+ if(OffsetOutOfRangeCount++ > 0) {
+ throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
+ "Exceptions: "+exception);
+ }
+ // get valid offsets for these partitions and try again.
+ LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+ getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+ LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
+ continue; // jump back to create a new fetch request. The offset has not been touched.
+ } else {
+ // all partitions failed on an error
+ throw new IOException("Error while fetching from broker: " + exception);
+ }
+ }
+
+ int messagesInFetch = 0;
+ for (FetchPartition fp : partitions) {
+ final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
+ final int partition = fp.partition;
+
+ for (MessageAndOffset msg : messageSet) {
+ if (running) {
+ messagesInFetch++;
+ if (msg.offset() < fp.nextOffsetToRead) {
+ // we have seen this message already
+ LOG.info("Skipping message with offset " + msg.offset()
+ + " because we have seen messages until " + fp.nextOffsetToRead
+ + " from partition " + fp.partition + " already");
+ continue;
+ }
+
+ ByteBuffer payload = msg.message().payload();
+ byte[] valueByte = new byte[payload.remaining()];
+ payload.get(valueByte);
+
+ final T value = valueDeserializer.deserialize(valueByte);
+ final long offset = msg.offset();
+
+ synchronized (sourceContext.getCheckpointLock()) {
+ sourceContext.collect(value);
+ offsetsState[partition] = offset;
+ }
+
+ // advance offset for the next request
+ fp.nextOffsetToRead = offset + 1;
+ }
+ else {
+ // no longer running
+ return;
+ }
+ }
+ }
+ LOG.debug("This fetch contained {} messages", messagesInFetch);
+ }
+ }
+ catch (Throwable t) {
+ // report to the main thread
+ owner.onErrorInFetchThread(t);
+ }
+ finally {
+ // end of run loop. close connection to consumer
+ if (consumer != null) {
+ // closing the consumer should not fail the program
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ LOG.error("Error while closing the Kafka simple consumer", t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cancels this fetch thread. The thread will release all resources and terminate.
+ */
+ public void cancel() {
+ this.running = false;
+
+ // interrupt whatever the consumer is doing
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ this.interrupt();
+ }
+
+ /**
+ * Request latest offsets for a set of partitions, via a Kafka consumer.
+ *
+ * @param consumer The consumer connected to lead broker
+ * @param topic The topic name
+ * @param partitions The list of partitions we need offsets for
+ * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
+ */
+ private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
+
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+ for (FetchPartition fp: partitions) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ }
+
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ String exception = "";
+ for (FetchPartition fp: partitions) {
+ short code;
+ if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
+ exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+ }
+ }
+ throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
+ + ". " + exception);
+ }
+
+ for (FetchPartition fp: partitions) {
+ // the resulting offset is the next offset we are going to read
+ // for not-yet-consumed partitions, it is 0.
+ fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
+ }
+ }
+
+ private static long getInvalidOffsetBehavior(Properties config) {
+ long timeType;
+ if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
+ timeType = OffsetRequest.LatestTime();
+ } else {
+ timeType = OffsetRequest.EarliestTime();
+ }
+ return timeType;
+ }
+ }
+
+ private static class PartitionInfoFetcher extends Thread {
+
+ private final String topic;
+ private final Properties properties;
+
+ private volatile List<PartitionInfo> result;
+ private volatile Throwable error;
+
+
+ PartitionInfoFetcher(String topic, Properties properties) {
+ this.topic = topic;
+ this.properties = properties;
+ }
+
+ @Override
+ public void run() {
+ try {
+ result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ }
+
+ public List<PartitionInfo> getPartitions() throws Exception {
+ try {
+ this.join();
+ }
+ catch (InterruptedException e) {
+ throw new Exception("Partition fetching was cancelled before completion");
+ }
+
+ if (error != null) {
+ throw new Exception("Failed to fetch partitions for topic " + topic, error);
+ }
+ if (result != null) {
+ return result;
+ }
+ throw new Exception("Partition fetching failed");
+ }
+ }
+
+ private static class KillerWatchDog extends Thread {
+
+ private final Thread toKill;
+ private final long timeout;
+
+ private KillerWatchDog(Thread toKill, long timeout) {
+ super("KillerWatchDog");
+ setDaemon(true);
+
+ this.toKill = toKill;
+ this.timeout = timeout;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ final long deadline = System.currentTimeMillis() + timeout;
+ long now;
+
+ while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
+ try {
+ toKill.join(deadline - now);
+ }
+ catch (InterruptedException e) {
+ // ignore here, our job is important!
+ }
+ }
+
+ // this is harsh, but this watchdog is a last resort
+ if (toKill.isAlive()) {
+ toKill.stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
new file mode 100644
index 0000000..2a82561
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition offsets
+ * where the source should start reading, as well as committing offsets from completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+ /**
+ * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
+ * or to ZooKeeper, based on its configured behavior.
+ *
+ * @param offsetsToCommit The offset to commit, per partition.
+ */
+ void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
+
+ /**
+ * Positions the given fetcher to the initial read offsets where the stream consumption
+ * will start from.
+ *
+ * @param partitions The partitions for which to seeks the fetcher to the beginning.
+ * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
+ */
+ void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
+
+ /**
+ * Closes the offset handler, releasing all resources.
+ *
+ * @throws IOException Thrown, if the closing fails.
+ */
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
new file mode 100644
index 0000000..a38c3bd
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
+ *
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+ public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
+
+ private Partitioner wrapped;
+ public PartitionerWrapper(VerifiableProperties properties) {
+ wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
+ }
+
+ @Override
+ public int partition(Object value, int numberOfPartitions) {
+ return wrapped.partition(value, numberOfPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..001b6cb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+ private static final Charset CHARSET = Charset.forName("UTF-8");
+
+ @Override
+ public byte[] serialize(Object data) {
+ if (data instanceof String) {
+ return ((String) data).getBytes(CHARSET);
+ }
+ else {
+ throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ else {
+ return new String(bytes, CHARSET);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..42a5951
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+
+ private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
+
+
+ private final ZkClient zkClient;
+
+ private final String groupId;
+
+
+ public ZookeeperOffsetHandler(Properties props) {
+ this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+
+ if (this.groupId == null) {
+ throw new IllegalArgumentException("Required property '"
+ + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
+ }
+
+ String zkConnect = props.getProperty("zookeeper.connect");
+ if (zkConnect == null) {
+ throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
+ }
+
+ zkClient = new ZkClient(zkConnect,
+ Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
+ Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
+ new ZooKeeperStringSerializer());
+ }
+
+
+ @Override
+ public void commit(Map<TopicPartition, Long> offsetsToCommit) {
+ for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ long offset = entry.getValue();
+
+ if (offset >= 0) {
+ setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
+ }
+ }
+ }
+
+ @Override
+ public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
+ for (TopicPartition tp : partitions) {
+ long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
+
+ if (offset != OFFSET_NOT_SET) {
+ LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
+ tp.partition(), offset);
+
+ // the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
+ fetcher.seek(tp, offset + 1);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ zkClient.close();
+ }
+
+ // ------------------------------------------------------------------------
+ // Communication with Zookeeper
+ // ------------------------------------------------------------------------
+
+ public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+ }
+
+ public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+
+ scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
+ topicDirs.consumerOffsetDir() + "/" + tap.partition());
+
+ if (data._1().isEmpty()) {
+ return OFFSET_NOT_SET;
+ } else {
+ return Long.valueOf(data._1().get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..346a7d5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ * --> Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ * </pre>
+ *
+ * --> Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner extends KafkaPartitioner implements Serializable {
+ private static final long serialVersionUID = 1627268846962918126L;
+
+ int targetPartition = -1;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ int p = 0;
+ for(int i = 0; i < parallelInstances; i++) {
+ if(i == parallelInstanceId) {
+ targetPartition = partitions[p];
+ return;
+ }
+ if(++p == partitions.length) {
+ p = 0;
+ }
+ }
+ }
+
+ @Override
+ public int partition(Object element, int numPartitions) {
+ if(targetPartition == -1) {
+ throw new RuntimeException("The partitioner has not been initialized properly");
+ }
+ return targetPartition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..55519f0
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import kafka.producer.Partitioner;
+
+import java.io.Serializable;
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners have to be serializable!
+ */
+public abstract class KafkaPartitioner implements Partitioner, Serializable {
+
+ private static final long serialVersionUID = -1974260817778593473L;
+
+ /**
+ * Initializer for the Partitioner.
+ * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * @param parallelInstances the total number of parallel instances
+ * @param partitions an array describing the partition IDs of the available Kafka partitions.
+ */
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ // overwrite this method if needed.
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..3d392aa
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+ @Test
+ public void testPartitionsEqualConsumers() {
+ try {
+ int[] partitions = {4, 52, 17, 1};
+
+ for (int i = 0; i < partitions.length; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", partitions.length, i);
+
+ assertNotNull(parts);
+ assertEquals(1, parts.size());
+ assertTrue(contains(partitions, parts.get(0).partition()));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiplePartitionsPerConsumers() {
+ try {
+ final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+ final Set<Integer> allPartitions = new HashSet<>();
+ for (int i : partitions) {
+ allPartitions.add(i);
+ }
+
+ final int numConsumers = 3;
+ final int minPartitionsPerConsumer = partitions.length / numConsumers;
+ final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
+
+ for (int i = 0; i < numConsumers; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", numConsumers, i);
+
+ assertNotNull(parts);
+ assertTrue(parts.size() >= minPartitionsPerConsumer);
+ assertTrue(parts.size() <= maxPartitionsPerConsumer);
+
+ for (TopicPartition p : parts) {
+ // check that the element was actually contained
+ assertTrue(allPartitions.remove(p.partition()));
+ }
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitionsFewerThanConsumers() {
+ try {
+ final int[] partitions = {4, 52, 17, 1};
+
+ final Set<Integer> allPartitions = new HashSet<>();
+ for (int i : partitions) {
+ allPartitions.add(i);
+ }
+
+ final int numConsumers = 2 * partitions.length + 3;
+
+ for (int i = 0; i < numConsumers; i++) {
+ List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
+ partitions, "test-topic", numConsumers, i);
+
+ assertNotNull(parts);
+ assertTrue(parts.size() <= 1);
+
+ for (TopicPartition p : parts) {
+ // check that the element was actually contained
+ assertTrue(allPartitions.remove(p.partition()));
+ }
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAssignEmptyPartitions() {
+ try {
+ List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
+ assertNotNull(parts1);
+ assertTrue(parts1.isEmpty());
+
+ List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
+ assertNotNull(parts2);
+ assertTrue(parts2.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGrowingPartitionsRemainsStable() {
+ try {
+ final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+ final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
+
+ final Set<Integer> allNewPartitions = new HashSet<>();
+ final Set<Integer> allInitialPartitions = new HashSet<>();
+ for (int i : newPartitions) {
+ allNewPartitions.add(i);
+ }
+ for (int i : initialPartitions) {
+ allInitialPartitions.add(i);
+ }
+
+ final int numConsumers = 3;
+ final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
+ final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
+ final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
+ final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
+
+ List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 0);
+ List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 1);
+ List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
+ initialPartitions, "test-topic", numConsumers, 2);
+
+ assertNotNull(parts1);
+ assertNotNull(parts2);
+ assertNotNull(parts3);
+
+ assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
+ assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
+ assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
+
+ for (TopicPartition p : parts1) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts2) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts3) {
+ // check that the element was actually contained
+ assertTrue(allInitialPartitions.remove(p.partition()));
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allInitialPartitions.isEmpty());
+
+ // grow the set of partitions and distribute anew
+
+ List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 0);
+ List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 1);
+ List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
+ newPartitions, "test-topic", numConsumers, 2);
+
+ // new partitions must include all old partitions
+
+ assertTrue(parts1new.size() > parts1.size());
+ assertTrue(parts2new.size() > parts2.size());
+ assertTrue(parts3new.size() > parts3.size());
+
+ assertTrue(parts1new.containsAll(parts1));
+ assertTrue(parts2new.containsAll(parts2));
+ assertTrue(parts3new.containsAll(parts3));
+
+ assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
+ assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
+ assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+
+ for (TopicPartition p : parts1new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts2new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+ for (TopicPartition p : parts3new) {
+ // check that the element was actually contained
+ assertTrue(allNewPartitions.remove(p.partition()));
+ }
+
+ // all partitions must have been assigned
+ assertTrue(allNewPartitions.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static boolean contains(int[] array, int value) {
+ for (int i : array) {
+ if (i == value) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..e35fcfb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class KafkaConsumerTest {
+
+ @Test
+ public void testValidateZooKeeperConfig() {
+ try {
+ // empty
+ Properties emptyProperties = new Properties();
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no connect string (only group string)
+ Properties noConnect = new Properties();
+ noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ // no group string (only connect string)
+ Properties noGroup = new Properties();
+ noGroup.put("zookeeper.connect", "localhost:47574");
+ try {
+ FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
+ fail("should fail with an exception");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSnapshot() {
+ try {
+ Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
+ Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
+ Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+
+ offsetsField.setAccessible(true);
+ runningField.setAccessible(true);
+ mapField.setAccessible(true);
+
+ FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
+ when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+
+ long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
+ LinkedMap map = new LinkedMap();
+
+ offsetsField.set(consumer, testOffsets);
+ runningField.set(consumer, true);
+ mapField.set(consumer, map);
+
+ assertTrue(map.isEmpty());
+
+ // make multiple checkpoints
+ for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
+ long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
+ assertArrayEquals(testOffsets, checkpoint);
+
+ // change the offsets, make sure the snapshot did not change
+ long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
+
+ for (int i = 0; i < testOffsets.length; i++) {
+ testOffsets[i] += 1L;
+ }
+
+ assertArrayEquals(checkpointCopy, checkpoint);
+
+ assertTrue(map.size() > 0);
+ assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ @Ignore("Kafka consumer internally makes an infinite loop")
+ public void testCreateSourceWithoutCluster() {
+ try {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", "localhost:56794");
+ props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
+ props.setProperty("group.id", "non-existent-group");
+
+ new FlinkKafkaConsumer<>("no op topic", new JavaDefaultStringSchema(), props,
+ FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
+ FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}