You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:04 UTC
[48/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
deleted file mode 100644
index 8066b3c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ /dev/null
@@ -1,689 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
- * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions.
- *
- * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once".
- * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- *
- * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches,
- * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p>
- *
- * <h1>Fetcher</h1>
- *
- * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in
- * protocols and APIs, there are currently two fetchers available:</p>
- *
- * <ul>
- * <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API.
- * This fetcher is generally more robust, but works only with later versions of
- * Kafka (> 0.8.2).</li>
- *
- * <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API.
- * This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets
- * the old Kafka consumer properties, like:
- * <ul>
- * <li>socket.timeout.ms</li>
- * <li>socket.receive.buffer.bytes</li>
- * <li>fetch.message.max.bytes</li>
- * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
- * <li>fetch.wait.max.ms</li>
- * </ul>
- * </li>
- * </ul>
- *
- * <h1>Offset handler</h1>
- *
- * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper
- * by the offset handler. In addition, the offset handler finds the point where the source initially
- * starts reading from the stream, when the streaming job is started.</p>
- *
- * <p>Currently, the source offers two different offset handlers exist:</p>
- * <ul>
- * <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets,
- * and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper.
- * Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and
- * with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
- *
- * <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed
- * by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
- * </ul>
- *
- * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
- * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
- * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
- * has consumed a topic.</p>
- *
- * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
- * is constructed. That means that the client that submits the program needs to be able to
- * reach the Kafka brokers or ZooKeeper.</p>
- */
-public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
- implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
-
- /**
- * The offset store defines how acknowledged offsets are committed back to Kafka. Different
- * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the
- * offsets (new Kafka versions only).
- */
- public enum OffsetStore {
-
- /**
- * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after
- * successful checkpoints), in the same structure as Kafka 0.8.2.x
- *
- * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p>
- */
- FLINK_ZOOKEEPER,
-
- /**
- * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different
- * mechanism will be used (broker coordinator, zookeeper)
- */
- KAFKA
- }
-
- /**
- * The fetcher type defines which code paths to use to pull data from teh Kafka broker.
- */
- public enum FetcherType {
-
- /**
- * The legacy fetcher uses Kafka's old low-level consumer API.
- *
- * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
- */
- LEGACY_LOW_LEVEL,
-
- /**
- * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker.
- * It is the fetcher that will be maintained in the future, and it already
- * handles certain failure cases with less overhead than the legacy fetcher.
- *
- * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p>
- */
- NEW_HIGH_LEVEL
- }
-
- // ------------------------------------------------------------------------
-
- private static final long serialVersionUID = -6272159445203409112L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
- /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
- * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
- public static final long OFFSET_NOT_SET = -915623761776L;
-
- /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
- public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
- /** Configuration key for the number of retries for getting the partition info */
- public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
-
- /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
- public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
-
-
-
- // ------ Configuration of the Consumer -------
-
- /** The offset store where this consumer commits safe offsets */
- private final OffsetStore offsetStore;
-
- /** The type of fetcher to be used to pull data from Kafka */
- private final FetcherType fetcherType;
-
- /** name of the topic consumed by this source */
- private final String topic;
-
- /** The properties to parametrize the Kafka consumer and ZooKeeper client */
- private final Properties props;
-
- /** The ids of the partitions that are read by this consumer */
- private final int[] partitions;
-
- /** The schema to convert between Kafka#s byte messages, and Flink's objects */
- private final DeserializationSchema<T> valueDeserializer;
-
- // ------ Runtime State -------
-
- /** Data for pending but uncommitted checkpoints */
- private final LinkedMap pendingCheckpoints = new LinkedMap();
-
- /** The fetcher used to pull data from the Kafka brokers */
- private transient Fetcher fetcher;
-
- /** The committer that persists the committed offsets */
- private transient OffsetHandler offsetHandler;
-
- /** The partitions actually handled by this consumer */
- private transient List<TopicPartition> subscribedPartitions;
-
- /** The offsets of the last returned elements */
- private transient long[] lastOffsets;
-
- /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
- * newer then the last offsets (Flink's internal view is fresher) */
- private transient long[] commitedOffsets;
-
- /** The offsets to restore to, if the consumer restores state from a checkpoint */
- private transient long[] restoreToOffset;
-
- private volatile boolean running = true;
-
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
- *
- * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
- * at the beginnign of this class.</p>
- *
- * @param topic
- * The Kafka topic to read from.
- * @param valueDeserializer
- * The deserializer to turn raw byte messages into Java/Scala objects.
- * @param props
- * The properties that are used to configure both the fetcher and the offset handler.
- * @param offsetStore
- * The type of offset store to use (Kafka / ZooKeeper)
- * @param fetcherType
- * The type of fetcher to use (new high-level API, old low-level API).
- */
- public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props,
- OffsetStore offsetStore, FetcherType fetcherType) {
- this.offsetStore = checkNotNull(offsetStore);
- this.fetcherType = checkNotNull(fetcherType);
-
- if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
- throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
- "supported in Flink");
- }
- if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
- throw new IllegalArgumentException(
- "The Kafka offset handler cannot be used together with the old low-level fetcher.");
- }
-
- this.topic = checkNotNull(topic, "topic");
- this.props = checkNotNull(props, "props");
- this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
-
- // validate the zookeeper properties
- if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
- validateZooKeeperConfig(props);
- }
-
- // Connect to a broker to get the partitions
- List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
-
- // get initial partitions list. The order of the partitions is important for consistent
- // partition id assignment in restart cases.
- this.partitions = new int[partitionInfos.size()];
- for (int i = 0; i < partitionInfos.size(); i++) {
- partitions[i] = partitionInfos.get(i).partition();
-
- if (partitions[i] >= partitions.length) {
- throw new RuntimeException("Kafka partition numbers are sparse");
- }
- }
- LOG.info("Topic {} has {} partitions", topic, partitions.length);
-
- // make sure that we take care of the committing
- props.setProperty("enable.auto.commit", "false");
- }
-
- // ------------------------------------------------------------------------
- // Source life cycle
- // ------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
- final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
-
- // pick which partitions we work on
- subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
- thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
- }
-
- // we leave the fetcher as null, if we have no partitions
- if (subscribedPartitions.isEmpty()) {
- LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
- return;
- }
-
- // create fetcher
- switch (fetcherType){
- case NEW_HIGH_LEVEL:
- throw new UnsupportedOperationException("Currently unsupported");
- case LEGACY_LOW_LEVEL:
- fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
- break;
- default:
- throw new RuntimeException("Requested unknown fetcher " + fetcher);
- }
- fetcher.setPartitionsToRead(subscribedPartitions);
-
- // offset handling
- switch (offsetStore){
- case FLINK_ZOOKEEPER:
- offsetHandler = new ZookeeperOffsetHandler(props);
- break;
- case KAFKA:
- throw new Exception("Kafka offset handler cannot work with legacy fetcher");
- default:
- throw new RuntimeException("Requested unknown offset store " + offsetStore);
- }
-
- // set up operator state
- lastOffsets = new long[partitions.length];
- commitedOffsets = new long[partitions.length];
-
- Arrays.fill(lastOffsets, OFFSET_NOT_SET);
- Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
-
- // seek to last known pos, from restore request
- if (restoreToOffset != null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Consumer {} found offsets from previous checkpoint: {}",
- thisComsumerIndex, Arrays.toString(restoreToOffset));
- }
-
- for (int i = 0; i < restoreToOffset.length; i++) {
- long restoredOffset = restoreToOffset[i];
- if (restoredOffset != OFFSET_NOT_SET) {
- // if this fails because we are not subscribed to the topic, then the
- // partition assignment is not deterministic!
-
- // we set the offset +1 here, because seek() is accepting the next offset to read,
- // but the restore offset is the last read offset
- fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
- lastOffsets[i] = restoredOffset;
- }
- }
- }
- else {
- // no restore request. Let the offset handler take care of the initial offset seeking
- offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
- }
- }
-
- @Override
- public void run(SourceContext<T> sourceContext) throws Exception {
- if (fetcher != null) {
- fetcher.run(sourceContext, valueDeserializer, lastOffsets);
- }
- else {
- // this source never completes
- final Object waitLock = new Object();
- while (running) {
- // wait until we are canceled
- try {
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (waitLock) {
- waitLock.wait();
- }
- }
- catch (InterruptedException e) {
- // do nothing, check our "running" status
- }
- }
- }
-
- // close the context after the work was done. this can actually only
- // happen when the fetcher decides to stop fetching
- sourceContext.close();
- }
-
- @Override
- public void cancel() {
- // set ourselves as not running
- running = false;
-
- // close the fetcher to interrupt any work
- Fetcher fetcher = this.fetcher;
- this.fetcher = null;
- if (fetcher != null) {
- try {
- fetcher.close();
- }
- catch (IOException e) {
- LOG.warn("Error while closing Kafka connector data fetcher", e);
- }
- }
-
- OffsetHandler offsetHandler = this.offsetHandler;
- this.offsetHandler = null;
- if (offsetHandler != null) {
- try {
- offsetHandler.close();
- }
- catch (IOException e) {
- LOG.warn("Error while closing Kafka connector offset handler", e);
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- cancel();
- super.close();
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return valueDeserializer.getProducedType();
- }
-
- // ------------------------------------------------------------------------
- // Checkpoint and restore
- // ------------------------------------------------------------------------
-
- @Override
- public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- if (lastOffsets == null) {
- LOG.debug("snapshotState() requested on not yet opened source; returning null.");
- return null;
- }
- if (!running) {
- LOG.debug("snapshotState() called on closed source");
- return null;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
- Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
- }
-
- long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
-
- // the map cannot be asynchronously updated, because only one checkpoint call can happen
- // on this function at a time: either snapshotState() or notifyCheckpointComplete()
- pendingCheckpoints.put(checkpointId, currentOffsets);
-
- while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
- pendingCheckpoints.remove(0);
- }
-
- return currentOffsets;
- }
-
- @Override
- public void restoreState(long[] restoredOffsets) {
- restoreToOffset = restoredOffsets;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- if (fetcher == null) {
- LOG.debug("notifyCheckpointComplete() called on uninitialized source");
- return;
- }
- if (!running) {
- LOG.debug("notifyCheckpointComplete() called on closed source");
- return;
- }
-
- // only one commit operation must be in progress
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
- }
-
- try {
- long[] checkpointOffsets;
-
- // the map may be asynchronously updates when snapshotting state, so we synchronize
- synchronized (pendingCheckpoints) {
- final int posInMap = pendingCheckpoints.indexOf(checkpointId);
- if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
- return;
- }
-
- checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
-
- // remove older checkpoints in map
- for (int i = 0; i < posInMap; i++) {
- pendingCheckpoints.remove(0);
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
- }
-
- // build the map of (topic,partition) -> committed offset
- Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
- for (TopicPartition tp : subscribedPartitions) {
-
- int partition = tp.partition();
- long offset = checkpointOffsets[partition];
- long lastCommitted = commitedOffsets[partition];
-
- if (offset != OFFSET_NOT_SET) {
- if (offset > lastCommitted) {
- offsetsToCommit.put(tp, offset);
- LOG.debug("Committing offset {} for partition {}", offset, partition);
- }
- else {
- LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
- }
- }
- }
-
- offsetHandler.commit(offsetsToCommit);
- }
- catch (Exception e) {
- if (running) {
- throw e;
- }
- // else ignore exception if we are no longer running
- }
- }
-
- // ------------------------------------------------------------------------
- // Miscellaneous utilities
- // ------------------------------------------------------------------------
-
- protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
- int numConsumers, int consumerIndex) {
- checkArgument(numConsumers > 0);
- checkArgument(consumerIndex < numConsumers);
-
- List<TopicPartition> partitionsToSub = new ArrayList<>();
-
- for (int i = 0; i < partitions.length; i++) {
- if (i % numConsumers == consumerIndex) {
- partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
- }
- }
- return partitionsToSub;
- }
-
- // ------------------------------------------------------------------------
- // Kafka / ZooKeeper communication utilities
- // ------------------------------------------------------------------------
-
- /**
- * Send request to Kafka to get partitions for topic.
- *
- * @param topic The name of the topic.
- * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
- */
- public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
- String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
- final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
-
- checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
- String[] seedBrokers = seedBrokersConfString.split(",");
- List<PartitionInfo> partitions = new ArrayList<>();
-
- Random rnd = new Random();
- retryLoop: for(int retry = 0; retry < numRetries; retry++) {
- // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
- // parallel source instances start. Still, we try all available brokers.
- int index = rnd.nextInt(seedBrokers.length);
- brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
- String seedBroker = seedBrokers[index];
- LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
- if (++index == seedBrokers.length) {
- index = 0;
- }
-
- URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
- SimpleConsumer consumer = null;
- try {
- final String clientId = "flink-kafka-consumer-partition-lookup";
- final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
- final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
- consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
-
- List<String> topics = Collections.singletonList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
- List<TopicMetadata> metaData = resp.topicsMetadata();
-
- // clear in case we have an incomplete list from previous tries
- partitions.clear();
- for (TopicMetadata item : metaData) {
- if (item.errorCode() != ErrorMapping.NoError()) {
- if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
- // fail hard if topic is unknown
- throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
- }
- // warn and try more brokers
- LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
- ErrorMapping.exceptionFor(item.errorCode()));
- continue brokersLoop;
- }
- if (!item.topic().equals(topic)) {
- LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
- continue brokersLoop;
- }
- for (PartitionMetadata part : item.partitionsMetadata()) {
- Node leader = brokerToNode(part.leader());
- Node[] replicas = new Node[part.replicas().size()];
- for (int i = 0; i < part.replicas().size(); i++) {
- replicas[i] = brokerToNode(part.replicas().get(i));
- }
-
- Node[] ISRs = new Node[part.isr().size()];
- for (int i = 0; i < part.isr().size(); i++) {
- ISRs[i] = brokerToNode(part.isr().get(i));
- }
- PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
- partitions.add(pInfo);
- }
- }
- break retryLoop; // leave the loop through the brokers
- } catch (Exception e) {
- LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
- } finally {
- if (consumer != null) {
- consumer.close();
- }
- }
- } // brokers loop
- } // retries loop
- return partitions;
- }
-
- private static Node brokerToNode(Broker broker) {
- return new Node(broker.id(), broker.host(), broker.port());
- }
-
- protected static void validateZooKeeperConfig(Properties props) {
- if (props.getProperty("zookeeper.connect") == null) {
- throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
- }
- if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
- throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
- + "' has not been set in the properties");
- }
-
- try {
- //noinspection ResultOfMethodCallIgnored
- Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
- }
-
- try {
- //noinspection ResultOfMethodCallIgnored
- Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
deleted file mode 100644
index 21f24e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually commit offsets
- * partition offsets to ZooKeeper.
- *
- * <p>The following additional configuration values are available:</p>
- * <ul>
- * <li>socket.timeout.ms</li>
- * <li>socket.receive.buffer.bytes</li>
- * <li>fetch.message.max.bytes</li>
- * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
- * <li>fetch.wait.max.ms</li>
- * </ul>
- *
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
-
- private static final long serialVersionUID = -5649906773771949146L;
-
- /**
- * Creates a new Kafka 0.8.1.x streaming source consumer.
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param valueDeserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
- super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
deleted file mode 100644
index 77e41e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
- * The consumer will internally use the old low-level Kafka API, and manually commit offsets
- * partition offsets to ZooKeeper.
- *
- * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API
- * also against Kafka 0.8.2 installations.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
-
- private static final long serialVersionUID = -8450689820627198228L;
-
- /**
- * Creates a new Kafka 0.8.2.x streaming source consumer.
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param valueDeserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param props
- * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
- super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
deleted file mode 100644
index 715f5ee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer does not have any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Array with the partition ids of the given topicId
- * The size of this array is the number of partitions
- */
- private final int[] partitions;
-
- /**
- * User defined properties for the Producer
- */
- private final Properties producerConfig;
-
- /**
- * The name of the topic this producer is writing data to
- */
- private final String topicId;
-
- /**
- * (Serializable) SerializationSchema for turning objects used with Flink into
- * byte[] for Kafka.
- */
- private final SerializationSchema<IN, byte[]> schema;
-
- /**
- * User-provided partitioner for assigning an object to a Kafka partition.
- */
- private final KafkaPartitioner partitioner;
-
- /**
- * Flag indicating whether to accept failures (and log them), or to fail on failures
- */
- private boolean logFailuresOnly;
-
- // -------------------------------- Runtime fields ------------------------------------------
-
- /** KafkaProducer instance */
- private transient KafkaProducer<byte[], byte[]> producer;
-
- /** The callback than handles error propagation or logging callbacks */
- private transient Callback callback;
-
- /** Errors encountered in the async producer are stored here */
- private transient volatile Exception asyncException;
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
- * the topic.
- *
- * @param brokerList
- * Comma separated addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- */
- public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
- * the topic.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- * @param producerConfig
- * Properties with the producer configuration.
- */
- public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, null);
- }
-
- /**
- * The main constructor for creating a FlinkKafkaProducer.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
- */
- public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
- Preconditions.checkNotNull(topicId, "TopicID not set");
- Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
- Preconditions.checkNotNull(producerConfig, "producerConfig not set");
- ClosureCleaner.ensureSerializable(customPartitioner);
- ClosureCleaner.ensureSerializable(serializationSchema);
-
- this.topicId = topicId;
- this.schema = serializationSchema;
- this.producerConfig = producerConfig;
-
- // set the producer configuration properties.
-
- if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
- } else {
- LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
-
- if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
- } else {
- LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
-
-
- // create a local KafkaProducer to get the list of partitions.
- // this will also ensure locally that all required ProducerConfig values are set.
- try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
- List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
-
- this.partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
- getPartitionsProd.close();
- }
-
- if (customPartitioner == null) {
- this.partitioner = new FixedPartitioner();
- } else {
- this.partitioner = customPartitioner;
- }
- }
-
- // ---------------------------------- Properties --------------------------
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
- * If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
- * fail (and enter recovery).
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- this.logFailuresOnly = logFailuresOnly;
- }
-
- // ----------------------------------- Utilities --------------------------
-
- /**
- * Initializes the connection to Kafka.
- */
- @Override
- public void open(Configuration configuration) {
- producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
-
- RuntimeContext ctx = getRuntimeContext();
- partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
-
- LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
- ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
-
- if (logFailuresOnly) {
- callback = new Callback() {
-
- @Override
- public void onCompletion(RecordMetadata metadata, Exception e) {
- if (e != null) {
- LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
- }
- }
- };
- }
- else {
- callback = new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null && asyncException == null) {
- asyncException = exception;
- }
- }
- };
- }
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Kafka.
- *
- * @param next
- * The incoming data
- */
- @Override
- public void invoke(IN next) throws Exception {
- // propagate asynchronous errors
- checkErroneous();
-
- byte[] serialized = schema.serialize(next);
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
- partitioner.partition(next, partitions.length),
- null, serialized);
-
- producer.send(record, callback);
- }
-
-
- @Override
- public void close() throws Exception {
- if (producer != null) {
- producer.close();
- }
-
- // make sure we propagate pending errors
- checkErroneous();
- }
-
-
- // ----------------------------------- Utilities --------------------------
-
- private void checkErroneous() throws Exception {
- Exception e = asyncException;
- if (e != null) {
- // prevent double throwing
- asyncException = null;
- throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
- }
- }
-
- public static Properties getPropertiesFromBrokerList(String brokerList) {
- String[] elements = brokerList.split(",");
- for(String broker: elements) {
- NetUtils.getCorrectHostnamePort(broker);
- }
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
deleted file mode 100644
index f856926..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api;
-
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink.
- * This class will be removed in future releases of Flink.
- */
-@Deprecated
-public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
- public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
- super(brokerList, topicId, serializationSchema);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
deleted file mode 100644
index 869c44f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.persistent;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.1+ consumers.
- *
- * This class is provided as a migration path from the old Flink kafka connectors to the new, updated implemntations.
- *
- * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-@Deprecated
-public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {
-
- private static final long serialVersionUID = -8450689820627198228L;
-
- /**
- * Creates a new Kafka 0.8.2.x streaming source consumer.
- *
- * @param topic
- * The name of the topic that should be consumed.
- * @param valueDeserializer
- * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
- * @param consumerConfig
- * The consumer config used to configure the Kafka consumer client, and the ZooKeeper client.
- */
- public PersistentKafkaSource(String topic, DeserializationSchema<T> valueDeserializer, ConsumerConfig consumerConfig) {
- super(topic, valueDeserializer, consumerConfig.props().props(), OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
deleted file mode 100644
index 4345926..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A fetcher pulls data from Kafka, from a fix set of partitions.
- * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
- */
-public interface Fetcher {
-
- /**
- * Set which partitions the fetcher should pull from.
- *
- * @param partitions The list of partitions for a topic that the fetcher will pull from.
- */
- void setPartitionsToRead(List<TopicPartition> partitions);
-
- /**
- * Closes the fetcher. This will stop any operation in the
- * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
- * close underlying connections and release all resources.
- */
- void close() throws IOException;
-
- /**
- * Starts fetch data from Kafka and emitting it into the stream.
- *
- * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
- * of the last consumed offset in one atomic operation:</p>
- * <pre>{@code
- *
- * while (running) {
- * T next = ...
- * long offset = ...
- * int partition = ...
- * synchronized (sourceContext.getCheckpointLock()) {
- * sourceContext.collect(next);
- * lastOffsets[partition] = offset;
- * }
- * }
- * }</pre>
- *
- * @param sourceContext The source context to emit elements to.
- * @param valueDeserializer The deserializer to decode the raw values with.
- * @param lastOffsets The array into which to store the offsets foe which elements are emitted.
- *
- * @param <T> The type of elements produced by the fetcher and emitted to the source context.
- */
- <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer,
- long[] lastOffsets) throws Exception;
-
- /**
- * Set the next offset to read from for the given partition.
- * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
- * will be the message with <i>offset=n</i>.
- *
- * @param topicPartition The partition for which to seek the offset.
- * @param offsetToRead To offset to seek to.
- */
- void seek(TopicPartition topicPartition, long offsetToRead);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
deleted file mode 100644
index c4ba103..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ /dev/null
@@ -1,622 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of partitions and offsets for a certain topic.
- *
- * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
- */
-public class LegacyFetcher implements Fetcher {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
- /** The topic from which this fetcher pulls data */
- private final String topic;
-
- /** The properties that configure the Kafka connection */
- private final Properties config;
-
- /** The task name, to give more readable names to the spawned threads */
- private final String taskName;
-
- /** The first error that occurred in a connection thread */
- private final AtomicReference<Throwable> error;
-
- /** The partitions that the fetcher should read, with their starting offsets */
- private Map<TopicPartition, Long> partitionsToRead;
-
- /** Reference the the thread that executed the run() method. */
- private volatile Thread mainThread;
-
- /** Flag to shot the fetcher down */
- private volatile boolean running = true;
-
- public LegacyFetcher(String topic, Properties props, String taskName) {
- this.config = checkNotNull(props, "The config properties cannot be null");
- this.topic = checkNotNull(topic, "The topic cannot be null");
- this.taskName = taskName;
- this.error = new AtomicReference<>();
- }
-
- // ------------------------------------------------------------------------
- // Fetcher methods
- // ------------------------------------------------------------------------
-
- @Override
- public void setPartitionsToRead(List<TopicPartition> partitions) {
- partitionsToRead = new HashMap<>(partitions.size());
- for (TopicPartition tp: partitions) {
- partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
- }
- }
-
- @Override
- public void seek(TopicPartition topicPartition, long offsetToRead) {
- if (partitionsToRead == null) {
- throw new IllegalArgumentException("No partitions to read set");
- }
- if (!partitionsToRead.containsKey(topicPartition)) {
- throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
- + ") we are not going to read. Partitions to read " + partitionsToRead);
- }
- partitionsToRead.put(topicPartition, offsetToRead);
- }
-
- @Override
- public void close() {
- // flag needs to be check by the run() method that creates the spawned threads
- this.running = false;
-
- // all other cleanup is made by the run method itself
- }
-
- @Override
- public <T> void run(SourceFunction.SourceContext<T> sourceContext,
- DeserializationSchema<T> valueDeserializer,
- long[] lastOffsets) throws Exception {
-
- if (partitionsToRead == null || partitionsToRead.size() == 0) {
- throw new IllegalArgumentException("No partitions set");
- }
-
- // NOTE: This method is needs to always release all resources it acquires
-
- this.mainThread = Thread.currentThread();
-
- LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
-
- // get lead broker for each partition
-
- // NOTE: The kafka client apparently locks itself in an infinite loop sometimes
- // when it is interrupted, so we run it only in a separate thread.
- // since it sometimes refuses to shut down, we resort to the admittedly harsh
- // means of killing the thread after a timeout.
- PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
- infoFetcher.start();
-
- KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
- watchDog.start();
-
- final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
-
- // brokers to fetch partitions from.
- int fetchPartitionsCount = 0;
- Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
-
- for (PartitionInfo partitionInfo : allPartitionsInTopic) {
- if (partitionInfo.leader() == null) {
- throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
- + " from topic "+partitionInfo.topic()+" because it does not have a leader");
- }
-
- for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
- final TopicPartition topicPartition = entry.getKey();
- final long offset = entry.getValue();
-
- // check if that partition is for us
- if (topicPartition.partition() == partitionInfo.partition()) {
- List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
- if (partitions == null) {
- partitions = new ArrayList<>();
- fetchBrokers.put(partitionInfo.leader(), partitions);
- }
-
- partitions.add(new FetchPartition(topicPartition.partition(), offset));
- fetchPartitionsCount++;
-
- }
- // else this partition is not for us
- }
- }
-
- if (partitionsToRead.size() != fetchPartitionsCount) {
- throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
- + fetchPartitionsCount + " partition infos with lead brokers.");
- }
-
- // create SimpleConsumers for each broker
- ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
-
- for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
- final Node broker = brokerInfo.getKey();
- final List<FetchPartition> partitionsList = brokerInfo.getValue();
-
- FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
-
- SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
- broker, partitions, sourceContext, valueDeserializer, lastOffsets);
-
- thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
- taskName, broker.id(), broker.host(), broker.port()));
- thread.setDaemon(true);
- consumers.add(thread);
- }
-
- // last check whether we should abort.
- if (!running) {
- return;
- }
-
- // start all consumer threads
- for (SimpleConsumerThread<?> t : consumers) {
- LOG.info("Starting thread {}", t.getName());
- t.start();
- }
-
- // wait until all consumer threads are done, or until we are aborted, or until
- // an error occurred in one of the fetcher threads
- try {
- boolean someConsumersRunning = true;
- while (running && error.get() == null && someConsumersRunning) {
- try {
- // wait for the consumer threads. if an error occurs, we are interrupted
- for (SimpleConsumerThread<?> t : consumers) {
- t.join();
- }
-
- // safety net
- someConsumersRunning = false;
- for (SimpleConsumerThread<?> t : consumers) {
- someConsumersRunning |= t.isAlive();
- }
- }
- catch (InterruptedException e) {
- // ignore. we should notice what happened in the next loop check
- }
- }
-
- // make sure any asynchronous error is noticed
- Throwable error = this.error.get();
- if (error != null) {
- throw new Exception(error.getMessage(), error);
- }
- }
- finally {
- // make sure that in any case (completion, abort, error), all spawned threads are stopped
- for (SimpleConsumerThread<?> t : consumers) {
- if (t.isAlive()) {
- t.cancel();
- }
- }
- }
- }
-
- /**
- * Reports an error from a fetch thread. This will cause the main thread to see this error,
- * abort, and cancel all other fetch threads.
- *
- * @param error The error to report.
- */
- void onErrorInFetchThread(Throwable error) {
- if (this.error.compareAndSet(null, error)) {
- // we are the first to report an error
- if (mainThread != null) {
- mainThread.interrupt();
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Representation of a partition to fetch.
- */
- private static class FetchPartition {
-
- /** ID of the partition within the topic (0 indexed, as given by Kafka) */
- int partition;
-
- /** Offset pointing at the next element to read from that partition. */
- long nextOffsetToRead;
-
- FetchPartition(int partition, long nextOffsetToRead) {
- this.partition = partition;
- this.nextOffsetToRead = nextOffsetToRead;
- }
-
- @Override
- public String toString() {
- return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
- }
- }
-
- // ------------------------------------------------------------------------
- // Per broker fetcher
- // ------------------------------------------------------------------------
-
- /**
- * Each broker needs its separate connection. This thread implements the connection to
- * one broker. The connection can fetch multiple partitions from the broker.
- *
- * @param <T> The data type fetched.
- */
- private static class SimpleConsumerThread<T> extends Thread {
-
- private final SourceFunction.SourceContext<T> sourceContext;
- private final DeserializationSchema<T> valueDeserializer;
- private final long[] offsetsState;
-
- private final FetchPartition[] partitions;
-
- private final Node broker;
- private final String topic;
- private final Properties config;
-
- private final LegacyFetcher owner;
-
- private SimpleConsumer consumer;
-
- private volatile boolean running = true;
-
-
- // exceptions are thrown locally
- public SimpleConsumerThread(LegacyFetcher owner,
- Properties config, String topic,
- Node broker,
- FetchPartition[] partitions,
- SourceFunction.SourceContext<T> sourceContext,
- DeserializationSchema<T> valueDeserializer,
- long[] offsetsState) {
- this.owner = owner;
- this.config = config;
- this.topic = topic;
- this.broker = broker;
- this.partitions = partitions;
- this.sourceContext = checkNotNull(sourceContext);
- this.valueDeserializer = checkNotNull(valueDeserializer);
- this.offsetsState = checkNotNull(offsetsState);
- }
-
- @Override
- public void run() {
- try {
- // set up the config values
- final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
-
- // these are the actual configuration values of Kafka + their original default values.
- final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
- final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
- final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
- final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
- final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-
- // create the Kafka consumer that we actually use for fetching
- consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-
- // make sure that all partitions have some offsets to start with
- // those partitions that do not have an offset from a checkpoint need to get
- // their start offset from ZooKeeper
- {
- List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
-
- for (FetchPartition fp : partitions) {
- if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
- // retrieve the offset from the consumer
- partitionsToGetOffsetsFor.add(fp);
- }
- }
- if (partitionsToGetOffsetsFor.size() > 0) {
- getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
- LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
- topic, partitionsToGetOffsetsFor);
- }
- }
-
- // Now, the actual work starts :-)
- int OffsetOutOfRangeCount = 0;
- while (running) {
- FetchRequestBuilder frb = new FetchRequestBuilder();
- frb.clientId(clientId);
- frb.maxWait(maxWait);
- frb.minBytes(minBytes);
-
- for (FetchPartition fp : partitions) {
- frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
- }
- kafka.api.FetchRequest fetchRequest = frb.build();
- LOG.debug("Issuing fetch request {}", fetchRequest);
-
- FetchResponse fetchResponse;
- fetchResponse = consumer.fetch(fetchRequest);
-
- if (fetchResponse.hasError()) {
- String exception = "";
- List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
- for (FetchPartition fp : partitions) {
- short code = fetchResponse.errorCode(topic, fp.partition);
-
- if(code == ErrorMapping.OffsetOutOfRangeCode()) {
- // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
- // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
- partitionsToGetOffsetsFor.add(fp);
- } else if(code != ErrorMapping.NoError()) {
- exception += "\nException for partition " + fp.partition + ": " +
- StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
- }
- }
- if (partitionsToGetOffsetsFor.size() > 0) {
- // safeguard against an infinite loop.
- if(OffsetOutOfRangeCount++ > 0) {
- throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
- "Exceptions: "+exception);
- }
- // get valid offsets for these partitions and try again.
- LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
- getLastOffset(consumer, topic, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
- LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
- continue; // jump back to create a new fetch request. The offset has not been touched.
- } else {
- // all partitions failed on an error
- throw new IOException("Error while fetching from broker: " + exception);
- }
- }
-
- int messagesInFetch = 0;
- for (FetchPartition fp : partitions) {
- final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
- final int partition = fp.partition;
-
- for (MessageAndOffset msg : messageSet) {
- if (running) {
- messagesInFetch++;
- if (msg.offset() < fp.nextOffsetToRead) {
- // we have seen this message already
- LOG.info("Skipping message with offset " + msg.offset()
- + " because we have seen messages until " + fp.nextOffsetToRead
- + " from partition " + fp.partition + " already");
- continue;
- }
-
- ByteBuffer payload = msg.message().payload();
- byte[] valueByte = new byte[payload.remaining()];
- payload.get(valueByte);
-
- final T value = valueDeserializer.deserialize(valueByte);
- final long offset = msg.offset();
-
- synchronized (sourceContext.getCheckpointLock()) {
- sourceContext.collect(value);
- offsetsState[partition] = offset;
- }
-
- // advance offset for the next request
- fp.nextOffsetToRead = offset + 1;
- }
- else {
- // no longer running
- return;
- }
- }
- }
- LOG.debug("This fetch contained {} messages", messagesInFetch);
- }
- }
- catch (Throwable t) {
- // report to the main thread
- owner.onErrorInFetchThread(t);
- }
- finally {
- // end of run loop. close connection to consumer
- if (consumer != null) {
- // closing the consumer should not fail the program
- try {
- consumer.close();
- }
- catch (Throwable t) {
- LOG.error("Error while closing the Kafka simple consumer", t);
- }
- }
- }
- }
-
- /**
- * Cancels this fetch thread. The thread will release all resources and terminate.
- */
- public void cancel() {
- this.running = false;
-
- // interrupt whatever the consumer is doing
- if (consumer != null) {
- consumer.close();
- }
-
- this.interrupt();
- }
-
- /**
- * Request latest offsets for a set of partitions, via a Kafka consumer.
- *
- * @param consumer The consumer connected to lead broker
- * @param topic The topic name
- * @param partitions The list of partitions we need offsets for
- * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
- */
- private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
-
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
- for (FetchPartition fp: partitions) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- }
-
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- String exception = "";
- for (FetchPartition fp: partitions) {
- short code;
- if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
- exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
- }
- }
- throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
- + ". " + exception);
- }
-
- for (FetchPartition fp: partitions) {
- // the resulting offset is the next offset we are going to read
- // for not-yet-consumed partitions, it is 0.
- fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
- }
- }
-
- private static long getInvalidOffsetBehavior(Properties config) {
- long timeType;
- if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
- timeType = OffsetRequest.LatestTime();
- } else {
- timeType = OffsetRequest.EarliestTime();
- }
- return timeType;
- }
- }
-
- private static class PartitionInfoFetcher extends Thread {
-
- private final String topic;
- private final Properties properties;
-
- private volatile List<PartitionInfo> result;
- private volatile Throwable error;
-
-
- PartitionInfoFetcher(String topic, Properties properties) {
- this.topic = topic;
- this.properties = properties;
- }
-
- @Override
- public void run() {
- try {
- result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
- }
- catch (Throwable t) {
- this.error = t;
- }
- }
-
- public List<PartitionInfo> getPartitions() throws Exception {
- try {
- this.join();
- }
- catch (InterruptedException e) {
- throw new Exception("Partition fetching was cancelled before completion");
- }
-
- if (error != null) {
- throw new Exception("Failed to fetch partitions for topic " + topic, error);
- }
- if (result != null) {
- return result;
- }
- throw new Exception("Partition fetching failed");
- }
- }
-
- private static class KillerWatchDog extends Thread {
-
- private final Thread toKill;
- private final long timeout;
-
- private KillerWatchDog(Thread toKill, long timeout) {
- super("KillerWatchDog");
- setDaemon(true);
-
- this.toKill = toKill;
- this.timeout = timeout;
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void run() {
- final long deadline = System.currentTimeMillis() + timeout;
- long now;
-
- while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
- try {
- toKill.join(deadline - now);
- }
- catch (InterruptedException e) {
- // ignore here, our job is important!
- }
- }
-
- // this is harsh, but this watchdog is a last resort
- if (toKill.isAlive()) {
- toKill.stop();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
deleted file mode 100644
index 2a82561..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The offset handler is responsible for locating the initial partition offsets
- * where the source should start reading, as well as committing offsets from completed
- * checkpoints.
- */
-public interface OffsetHandler {
-
- /**
- * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
- * or to ZooKeeper, based on its configured behavior.
- *
- * @param offsetsToCommit The offset to commit, per partition.
- */
- void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
-
- /**
- * Positions the given fetcher to the initial read offsets where the stream consumption
- * will start from.
- *
- * @param partitions The partitions for which to seeks the fetcher to the beginning.
- * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
- */
- void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
-
- /**
- * Closes the offset handler, releasing all resources.
- *
- * @throws IOException Thrown, if the closing fails.
- */
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
- public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
- private Partitioner wrapped;
- public PartitionerWrapper(VerifiableProperties properties) {
- wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
- }
-
- @Override
- public int partition(Object value, int numberOfPartitions) {
- return wrapped.partition(value, numberOfPartitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
deleted file mode 100644
index 001b6cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
-
-/**
- * Simple ZooKeeper serializer for Strings.
- */
-public class ZooKeeperStringSerializer implements ZkSerializer {
-
- private static final Charset CHARSET = Charset.forName("UTF-8");
-
- @Override
- public byte[] serialize(Object data) {
- if (data instanceof String) {
- return ((String) data).getBytes(CHARSET);
- }
- else {
- throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- else {
- return new String(bytes, CHARSET);
- }
- }
-}