You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/12 23:03:19 UTC
[08/10] flink git commit: [FLINK-1935] Reimplement
PersistentKafkaSource using high level Kafka API
[FLINK-1935] Reimplement PersistentKafkaSource using high level Kafka API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54e95761
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54e95761
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54e95761
Branch: refs/heads/master
Commit: 54e957614c38fed69baf726fc86059e9b11384cb
Parents: 3b2ee23
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Apr 23 12:29:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 21:35:58 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 3 +-
.../checkpoint/CheckpointCoordinator.java | 2 +-
.../runtime/operators/RegularPactTask.java | 2 +-
.../flink/runtime/state/OperatorState.java | 87 ----
.../flink/runtime/state/StateCheckpoint.java | 59 ---
.../streaming/connectors/ConnectorSource.java | 14 +-
.../connectors/kafka/KafkaConsumerExample.java | 1 -
.../kafka/KafkaSimpleConsumerExample.java | 58 ---
.../flink/streaming/connectors/kafka/Utils.java | 3 +-
.../connectors/kafka/api/KafkaSink.java | 53 ++-
.../connectors/kafka/api/KafkaSource.java | 1 -
.../api/persistent/PersistentKafkaSource.java | 346 ++++++++++++++++
.../kafka/api/simple/KafkaTopicUtils.java | 250 -----------
.../kafka/api/simple/MessageWithMetadata.java | 46 ---
.../kafka/api/simple/PersistentKafkaSource.java | 284 -------------
.../simple/iterator/KafkaConsumerIterator.java | 46 ---
.../iterator/KafkaIdleConsumerIterator.java | 57 ---
.../KafkaMultiplePartitionsIterator.java | 116 ------
.../iterator/KafkaSinglePartitionIterator.java | 392 ------------------
.../api/simple/offset/BeginningOffset.java | 32 --
.../kafka/api/simple/offset/CurrentOffset.java | 32 --
.../kafka/api/simple/offset/GivenOffset.java | 39 --
.../kafka/api/simple/offset/KafkaOffset.java | 87 ----
.../kafka/api/simple/offset/Offset.java | 32 --
.../streaming/connectors/kafka/KafkaITCase.java | 410 +++++++++++++------
.../connectors/kafka/KafkaTopicUtilsTest.java | 152 -------
.../src/test/resources/log4j-test.properties | 2 +-
.../checkpoint/CheckpointedAsynchronously.java | 4 +-
.../api/environment/LocalStreamEnvironment.java | 9 +-
.../environment/StreamExecutionEnvironment.java | 8 +-
.../functions/source/GenericSourceFunction.java | 25 --
.../flink/streaming/api/graph/StreamGraph.java | 19 +-
.../streaming/api/operators/StreamOperator.java | 9 +-
.../streaming/api/state/PartitionableState.java | 66 ---
.../streaming/runtime/tasks/StreamTask.java | 7 +-
.../serialization/DeserializationSchema.java | 4 +-
.../serialization/JavaDefaultStringSchema.java | 7 +
.../streaming/util/serialization/RawSchema.java | 8 +
.../util/serialization/SimpleStringSchema.java | 7 +
.../streaming/api/state/OperatorStateTest.java | 44 --
40 files changed, 711 insertions(+), 2112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0fec67f..2dcb76f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1445,12 +1445,11 @@ public class TypeExtractor {
public static <X> TypeInformation<X> getForObject(X value) {
return new TypeExtractor().privateGetForObject(value);
-
}
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private <X> TypeInformation<X> privateGetForObject(X value) {
Validate.notNull(value);
-
// check if we can extract the types from tuples, otherwise work with the class
if (value instanceof Tuple) {
Tuple t = (Tuple) value;
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b3f6587..be0fcf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -147,7 +147,7 @@ public class CheckpointCoordinator {
return;
}
shutdown = true;
- LOG.info("Stopping checkpoint coordinator jor job " + job);
+ LOG.info("Stopping checkpoint coordinator for job " + job);
// shut down the thread that handles the timeouts
timer.cancel();
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 2bee094..c844d8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -507,7 +507,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// Collect the accumulators of all involved UDFs and send them to the
// JobManager. close() has been called earlier for all involved UDFs
// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
- // modify accumulators.ll;
+ // modify accumulators;
if (this.stub != null) {
// collect the counters from the stub
if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
deleted file mode 100644
index 8dfd715..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Abstract class for representing operator states in Flink programs. By
- * implementing the methods declared in this abstraction the state of the
- * operator can be checkpointed by the fault tolerance mechanism.
- *
- * @param <T>
- * The type of the operator state.
- */
-public class OperatorState<T> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private T stateObject;
-
- /**
- * Initializes the state using the given state object.
- *
- * @param initialState
- * The initial state object
- */
- public OperatorState(T initialState) {
- stateObject = initialState;
- }
-
- /**
- * Returns the currently stored state object.
- *
- * @return The state.
- */
- public T getState() {
- return stateObject;
- }
-
- /**
- * Updates the current state object. States should be only updated using
- * this method to avoid concurrency issues.
- *
- * @param stateUpdate
- * The update applied.
- */
- @SuppressWarnings("unchecked")
- public synchronized void update(Object stateUpdate) {
- this.stateObject = (T) stateUpdate;
- }
-
- /**
- * Creates a {@link StateCheckpoint} that will be used to backup the state
- * for failure recovery. This method will be called by the state
- * checkpointer.
- *
- * @return The {@link StateCheckpoint} created.
- */
- public synchronized StateCheckpoint<T> checkpoint() {
- return new StateCheckpoint<T>(this);
- }
-
- @Override
- public String toString() {
- return stateObject.toString();
- }
-
- public boolean stateEquals(OperatorState<T> other) {
- return stateObject.equals(other.stateObject);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
deleted file mode 100644
index a872e5a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Base class for creating checkpoints for {@link OperatorState}. This
- * checkpoints will be used to backup states in stateful Flink operators and
- * also to restore them in case of node failure. To allow incremental
- * checkpoints override the {@link #update(StateCheckpoint)} method.
- *
- * @param <T>
- * The type of the state.
- */
-public class StateCheckpoint<T> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public OperatorState<T> checkpointedState;
-
- /**
- * Creates a state checkpoint from the given {@link OperatorState}
- *
- * @param operatorState
- * The {@link OperatorState} to checkpoint.
- */
- public StateCheckpoint(OperatorState<T> operatorState) {
- this.checkpointedState = operatorState;
- }
-
- public OperatorState<T> restore() {
- return checkpointedState;
- }
-
- @Override
- public String toString() {
- return checkpointedState.toString();
- }
-
- public boolean stateEquals(StateCheckpoint<T> other) {
- return checkpointedState.equals(other.checkpointedState);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index caabb21..fde4cdf 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -19,13 +19,10 @@ package org.apache.flink.streaming.connectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
- GenericSourceFunction<OUT> {
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
private static final long serialVersionUID = 1L;
protected DeserializationSchema<OUT> schema;
@@ -35,12 +32,7 @@ public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OU
}
@Override
- public TypeInformation<OUT> getType() {
- if(schema instanceof ResultTypeQueryable) {
- return ((ResultTypeQueryable<OUT>) schema).getProducedType();
- }
- return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
- null, null);
+ public TypeInformation<OUT> getProducedType() {
+ return schema.getProducedType();
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 1bf2962..fe6684d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -38,7 +38,6 @@ public class KafkaConsumerExample {
DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-
kafkaStream.print();
env.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
deleted file mode 100644
index d43460b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaSimpleConsumerExample {
-
- private static String host;
- private static int port;
- private static String topic;
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
- DataStream<String> kafkaStream = env
- .addSource(new PersistentKafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-
- kafkaStream.print();
-
- env.execute();
- }
-
- private static boolean parseParameters(String[] args) {
- if (args.length == 3) {
- host = args[0];
- port = Integer.parseInt(args[1]);
- topic = args[2];
- return true;
- } else {
- System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
- return false;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
index a29ba4d..4286196 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -30,7 +29,7 @@ import java.io.IOException;
public class Utils {
public static class TypeInformationSerializationSchema<T>
- implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
+ implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
private final TypeSerializer<T> serializer;
private final TypeInformation<T> ti;
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 85c4d35..0965b29 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -38,6 +37,7 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
+
/**
* Sink that emits its inputs to a Kafka topic.
*
@@ -53,7 +53,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private Producer<IN, byte[]> producer;
private Properties userDefinedProperties;
private String topicId;
- private String zookeeperAddress;
+ private String brokerList;
private SerializationSchema<IN, byte[]> schema;
private SerializableKafkaPartitioner partitioner;
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
@@ -62,16 +62,16 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
+ * @param brokerList
+ * Addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
*/
- public KafkaSink(String zookeeperAddress, String topicId,
+ public KafkaSink(String brokerList, String topicId,
SerializationSchema<IN, byte[]> serializationSchema) {
- this(zookeeperAddress, topicId, new Properties(), serializationSchema);
+ this(brokerList, topicId, new Properties(), serializationSchema);
}
/**
@@ -79,8 +79,8 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* If you use this constructor, the broker should be set with the "metadata.broker.list"
* configuration.
*
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
+ * @param brokerList
+ * Addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param producerConfig
@@ -88,13 +88,15 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* @param serializationSchema
* User defined serialization schema.
*/
- public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
+ public KafkaSink(String brokerList, String topicId, Properties producerConfig,
SerializationSchema<IN, byte[]> serializationSchema) {
- NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
+ String[] elements = brokerList.split(",");
+ for(String broker: elements) {
+ NetUtils.ensureCorrectHostnamePort(broker);
+ }
Preconditions.checkNotNull(topicId, "TopicID not set");
- ClosureCleaner.ensureSerializable(partitioner);
- this.zookeeperAddress = zookeeperAddress;
+ this.brokerList = brokerList;
this.topicId = topicId;
this.schema = serializationSchema;
this.partitionerClass = null;
@@ -105,8 +107,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
+ * @param brokerList
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
@@ -114,15 +115,18 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* @param partitioner
* User defined partitioner.
*/
- public KafkaSink(String zookeeperAddress, String topicId,
+ public KafkaSink(String brokerList, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
- this(zookeeperAddress, topicId, serializationSchema);
+ this(brokerList, topicId, serializationSchema);
+ ClosureCleaner.ensureSerializable(partitioner);
this.partitioner = partitioner;
}
- public KafkaSink(String zookeeperAddress, String topicId,
- SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
- this(zookeeperAddress, topicId, serializationSchema);
+ public KafkaSink(String brokerList,
+ String topicId,
+ SerializationSchema<IN, byte[]> serializationSchema,
+ Class<? extends SerializableKafkaPartitioner> partitioner) {
+ this(brokerList, topicId, serializationSchema);
this.partitionerClass = partitioner;
}
@@ -132,16 +136,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
@Override
public void open(Configuration configuration) {
- KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
- String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Broker list: {}", listOfBrokers);
- }
-
Properties properties = new Properties();
- properties.put("metadata.broker.list", listOfBrokers);
+ properties.put("metadata.broker.list", brokerList);
properties.put("request.required.acks", "-1");
properties.put("message.send.max.retries", "10");
@@ -168,7 +165,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
try {
producer = new Producer<IN, byte[]>(config);
} catch (NullPointerException e) {
- throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e);
+ throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index a0805c0..28e338c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -47,7 +47,6 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final String zookeeperAddress;
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
new file mode 100644
index 0000000..50ee15a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.persistent;
+
+import com.google.common.base.Preconditions;
+import kafka.common.TopicAndPartition;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Source for reading from Kafka using Flink Streaming Fault Tolerance.
+ * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
+ *
+ * Note that the autocommit feature of Kafka needs to be disabled for using this source.
+ */
+public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
+ ResultTypeQueryable<OUT>,
+ CheckpointCommitter,
+ CheckpointedAsynchronously<long[]> {
+ private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+
+ protected transient ConsumerConfig consumerConfig;
+ private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
+ private transient ConsumerConnector consumer;
+
+ private String topicName;
+ private DeserializationSchema<OUT> deserializationSchema;
+ private boolean running = true;
+
+ private transient long[] lastOffsets;
+ private transient ZkClient zkClient;
+ private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
+
+
+ /**
+ *
+ * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
+ * The config will be passed into the Kafka High Level Consumer.
+ * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
+ */
+ public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
+ Preconditions.checkNotNull(topicName);
+ Preconditions.checkNotNull(deserializationSchema);
+ Preconditions.checkNotNull(consumerConfig);
+
+ this.topicName = topicName;
+ this.deserializationSchema = deserializationSchema;
+ this.consumerConfig = consumerConfig;
+ if(consumerConfig.autoCommitEnable()) {
+ throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
+ "This source can only be used with auto commit disabled because the " +
+ "source is committing to zookeeper by itself (not using the KafkaConsumer).");
+ }
+ if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+ // we can currently only commit to ZK.
+ throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
+ }
+ }
+
+ // ---------------------- ParallelSourceFunction Lifecycle -----------------
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
+ if(streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if(kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if(kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
+ this.iteratorToRead = kafkaStreams.get(0).iterator();
+ this.consumer = consumer;
+
+ zkClient = new ZkClient(consumerConfig.zkConnect(),
+ consumerConfig.zkSessionTimeoutMs(),
+ consumerConfig.zkConnectionTimeoutMs(),
+ new KafkaZKStringSerializer());
+
+ // most likely the number of offsets we're going to store here will be lower than the number of partitions.
+ int numPartitions = getNumberOfPartitions();
+ LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
+ this.lastOffsets = new long[numPartitions];
+ this.commitedOffsets = new long[numPartitions];
+ Arrays.fill(this.lastOffsets, -1);
+ Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+ }
+
+
+ @Override
+ public void run(Collector<OUT> collector) throws Exception {
+ if(iteratorToRead == null) {
+ throw new RuntimeException("Stream to read not initialized properly. Has open() been called");
+ }
+ try {
+ while (iteratorToRead.hasNext()) {
+ if (!running) {
+ LOG.info("Source got stopped");
+ break;
+ }
+ MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
+ if(lastOffsets[message.partition()] >= message.offset()) {
+ LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
+ continue;
+ }
+ lastOffsets[message.partition()] = message.offset();
+
+ OUT out = deserializationSchema.deserialize(message.message());
+ if (deserializationSchema.isEndOfStream(out)) {
+ LOG.info("DeserializationSchema signaled end of stream for this source");
+ break;
+ }
+
+ collector.collect(out);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
+ }
+ }
+ } catch(Exception ie) {
+ // this exception is coming from Scala code.
+ if(ie instanceof InterruptedException) {
+ if(running) {
+ throw new RuntimeException("Error while reading kafka consumer", ie);
+ } else {
+ LOG.debug("Kafka consumer got interrupted because it has been cancelled. This is expected", ie);
+ }
+ } else {
+ throw ie;
+ }
+ }
+
+ LOG.info("Source has finished reading data from the KafkaStream");
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Instructing source to stop reading data from Kafka");
+ running = false;
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing Kafka consumer");
+ this.consumer.shutdown();
+ zkClient.close();
+ }
+
+
+ // ---------------------- State / Checkpoint handling -----------------
+ // this source is keeping the partition offsets in Zookeeper
+
+ private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
+
+ @Override
+ public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if(lastOffsets == null) {
+ LOG.warn("State snapshot requested on not yet opened source. Returning null");
+ return null;
+ }
+ LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+
+ long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+ pendingCheckpoints.put(checkpointId, currentOffsets);
+ return currentOffsets;
+ }
+
+ @Override
+ public void restoreState(long[] state) {
+ // we maintain the offsets in Kafka, so nothing to do.
+ }
+
+
+ /**
+ * Notification on completed checkpoints
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ */
+ @Override
+ public void commitCheckpoint(long checkpointId) {
+ LOG.info("Commit checkpoint {}", checkpointId);
+ long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
+ if(checkpointOffsets == null) {
+ LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+ return;
+ }
+ LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
+
+ for(int partition = 0; partition < checkpointOffsets.length; partition++) {
+ long offset = checkpointOffsets[partition];
+ if(offset != -1) {
+ setOffset(partition, offset);
+ }
+ }
+ }
+
+ // --------------------- Zookeeper / Offset handling -----------------------------
+
+ private int getNumberOfPartitions() {
+ scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
+ scala.collection.mutable.Map<String, Seq<Object>> list = ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
+ Option<Seq<Object>> topicOption = list.get(topicName);
+ if(topicOption.isEmpty()) {
+ throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
+ }
+ Seq<Object> topic = topicOption.get();
+ return topic.size();
+ }
+
+ protected void setOffset(int partition, long offset) {
+ if(commitedOffsets[partition] < offset) {
+ setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
+ commitedOffsets[partition] = offset;
+ } else {
+ LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+ }
+ }
+
+
+
+ // the following two methods are static to allow access from the outside as well (Testcases)
+
+ /**
+ * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
+ */
+ public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+ LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+ }
+
+ public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
+ return Long.valueOf(data._1());
+ }
+
+
+ // ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
+ private void writeObject(ObjectOutputStream out)
+ throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ out.writeObject(consumerConfig.props().props());
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ Properties props = (Properties) in.readObject();
+ consumerConfig = new ConsumerConfig(props);
+ }
+
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+
+ // ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there) -----------------
+
+ public static class KafkaZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null) {
+ return null;
+ } else {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
deleted file mode 100644
index 365961d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.api.TopicMetadata;
-import kafka.cluster.Broker;
-import kafka.common.LeaderNotAvailableException;
-import kafka.common.UnknownTopicOrPartitionException;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-
-/**
- * For retrieving Kafka topic information (e.g. number of partitions),
- * or creating a topic.
- */
-public class KafkaTopicUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
-
- private ZkClient zkClient;
-
- public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
- public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
-
- private final String zookeeperAddress;
- private final int sessionTimeoutMs;
- private final int connectionTimeoutMs;
-
- private volatile boolean isRunning = false;
-
- public KafkaTopicUtils(String zookeeperServer) {
- this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
- }
-
- public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) {
- this.zookeeperAddress = zookeeperAddress;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.connectionTimeoutMs = connectionTimeoutMs;
- }
-
- public void createTopic(String topicName, int numOfPartitions, int replicationFactor) {
-
- LOG.info("Creating Kafka topic '{}'", topicName);
- Properties topicConfig = new Properties();
- if (topicExists(topicName)) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
- }
- } else {
- LOG.info("Connecting zookeeper");
-
- initZkClient();
- AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
- closeZkClient();
- }
- }
-
- public String getBrokerList(String topicName) {
- return getBrokerAddressList(getBrokerAddresses(topicName));
- }
-
- public String getBrokerList(String topicName, int partitionId) {
- return getBrokerAddressList(getBrokerAddresses(topicName, partitionId));
- }
-
- public Set<String> getBrokerAddresses(String topicName) {
- int numOfPartitions = getNumberOfPartitions(topicName);
-
- HashSet<String> brokers = new HashSet<String>();
- for (int i = 0; i < numOfPartitions; i++) {
- brokers.addAll(getBrokerAddresses(topicName, i));
- }
- return brokers;
- }
-
- public Set<String> getBrokerAddresses(String topicName, int partitionId) {
- PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId);
- Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr());
-
- HashSet<String> addresses = new HashSet<String>();
- for (Broker broker : inSyncReplicas) {
- addresses.add(broker.connectionString());
- }
- return addresses;
- }
-
- private static String getBrokerAddressList(Set<String> brokerAddresses) {
- StringBuilder brokerAddressList = new StringBuilder("");
- for (String broker : brokerAddresses) {
- brokerAddressList.append(broker);
- brokerAddressList.append(',');
- }
- brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
-
- return brokerAddressList.toString();
- }
-
- public int getNumberOfPartitions(String topicName) {
- Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata();
- return JavaConversions.asJavaCollection(partitionMetadataSeq).size();
- }
-
- public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) {
- isRunning = true;
- PartitionMetadata partitionMetadata = null;
- while (isRunning) {
- try {
- partitionMetadata = getPartitionMetadata(topicName, partitionId);
- return partitionMetadata;
- } catch (LeaderNotAvailableException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got {} trying to fetch metadata again", e.getMessage());
- }
- }
- }
- isRunning = false;
- return partitionMetadata;
- }
-
- public PartitionMetadata getPartitionMetadata(String topicName, int partitionId) {
- PartitionMetadata partitionMetadata = getPartitionMetadataWithErrorCode(topicName, partitionId);
- switch (partitionMetadata.errorCode()) {
- case 0:
- return partitionMetadata;
- case 3:
- throw new UnknownTopicOrPartitionException("While fetching metadata for " + topicName + " / " + partitionId);
- case 5:
- throw new LeaderNotAvailableException("While fetching metadata for " + topicName + " / " + partitionId);
- default:
- throw new RuntimeException("Unknown error occurred while fetching metadata for "
- + topicName + " / " + partitionId + ", with error code: " + partitionMetadata.errorCode());
- }
- }
-
- private PartitionMetadata getPartitionMetadataWithErrorCode(String topicName, int partitionId) {
- TopicMetadata topicInfo = getTopicMetadata(topicName);
-
- Collection<PartitionMetadata> partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata());
-
- Iterator<PartitionMetadata> iterator = partitions.iterator();
- for (PartitionMetadata partition : partitions) {
- if (partition.partitionId() == partitionId) {
- return partition;
- }
- }
-
- throw new RuntimeException("No such partition: " + topicName + " / " + partitionId);
- }
-
- public TopicMetadata getTopicMetadata(String topicName) {
- TopicMetadata topicMetadata = getTopicMetadataWithErrorCode(topicName);
- switch (topicMetadata.errorCode()) {
- case 0:
- return topicMetadata;
- case 3:
- throw new UnknownTopicOrPartitionException("While fetching metadata for topic " + topicName);
- case 5:
- throw new LeaderNotAvailableException("While fetching metadata for topic " + topicName);
- default:
- throw new RuntimeException("Unknown error occurred while fetching metadata for topic "
- + topicName + ", with error code: " + topicMetadata.errorCode());
- }
- }
-
- private TopicMetadata getTopicMetadataWithErrorCode(String topicName) {
- if (topicExists(topicName)) {
- initZkClient();
- TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
- closeZkClient();
-
- return topicMetadata;
- } else {
- throw new RuntimeException("Topic does not exist: " + topicName);
- }
- }
-
- public boolean topicExists(String topicName) {
- initZkClient();
- boolean topicExists = AdminUtils.topicExists(zkClient, topicName);
- closeZkClient();
-
- return topicExists;
- }
-
- private void initZkClient() {
- zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
- new KafkaZKStringSerializer());
- zkClient.waitUntilConnected();
- }
-
- private void closeZkClient() {
- zkClient.close();
- zkClient = null;
- }
-
- private static class KafkaZKStringSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- try {
- return ((String) data).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- if (bytes == null) {
- return null;
- } else {
- try {
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
deleted file mode 100644
index 3985350..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-/**
- * POJO encapsulating records received from Kafka with their offset and partition.
- */
-public class MessageWithMetadata {
-
- private int partition;
- private long offset;
- private byte[] message;
-
- public MessageWithMetadata(long offset, byte[] message, int partition) {
- this.partition = partition;
- this.offset = offset;
- this.message = message;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public byte[] getMessage() {
- return message;
- }
-
- public int getPartition() {
- return partition;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
deleted file mode 100644
index a842160..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaMultiplePartitionsIterator;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Kafka source persisting its offset through the {@link OperatorState} interface.
- * This allows the offset to be restored to the latest one that has been acknowledged
- * by the whole execution graph.
- *
- * @param <OUT>
- * Type of the messages on the topic.
- */
-public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> implements Checkpointed<HashMap<Integer, KafkaOffset>> {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
-
- public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
- public static final String WAIT_ON_FAILED_LEADER_MS_KEY = "flink.waitOnFailedLeaderDetection";
- public static final int WAIT_ON_FAILED_LEADER__MS_DEFAULT = 2000;
-
- public static final String MAX_FAILED_LEADER_RETRIES_KEY = "flink.maxLeaderDetectionRetries";
- public static final int MAX_FAILED_LEADER_RETRIES_DEFAULT = 3;
-
- private final String topicId;
- private final KafkaOffset startingOffset;
- private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
-
- private transient KafkaConsumerIterator iterator;
- private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSetOperatorState;
-
- private transient Map<Integer, KafkaOffset> partitionOffsets;
-
- /**
- * Creates a persistent Kafka source that consumes a topic.
- * If there is are no new messages on the topic, this consumer will wait
- * 100 milliseconds before trying to fetch messages again.
- * The consumer will start consuming from the latest messages in the topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- */
- public PersistentKafkaSource(String zookeeperAddress, String topicId,
- DeserializationSchema<OUT> deserializationSchema) {
- this(zookeeperAddress, topicId, deserializationSchema, KafkaTopicUtils.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS, 100);
- }
-
- /**
- * Creates a persistent Kafka source that consumes a topic.
- * If there is are no new messages on the topic, this consumer will wait
- * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
- * The consumer will start consuming from the latest messages in the topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param zookeeperSyncTimeMillis
- * Synchronization time with zookeeper.
- * @param waitOnEmptyFetchMillis
- * Time to wait before fetching for new message.
- */
- public PersistentKafkaSource(String zookeeperAddress, String topicId,
- DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
- this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
- }
-
-
- /**
- * Creates a persistent Kafka source that consumes a topic.
- * If there is are no new messages on the topic, this consumer will wait
- * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
- *
- * THIS CONSTRUCTOR IS DEPRECATED: USE the constructor with the ConsumerConfig.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param zookeeperSyncTimeMillis
- * Synchronization time with zookeeper.
- * @param waitOnEmptyFetchMillis
- * Time to wait before fetching for new message.
- * @param startOffsetType
- * The offset to start from (beginning or current).
- */
- @Deprecated
- public PersistentKafkaSource(String zookeeperAddress, String topicId,DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis, Offset startOffsetType) {
- this(topicId, deserializationSchema, startOffsetType, legacyParametersToConsumerConfig(zookeeperAddress, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis));
- }
-
- private static ConsumerConfig legacyParametersToConsumerConfig(String zookeeperAddress, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
- Properties props = new Properties();
- props.setProperty("zookeeper.sync.time.ms", Integer.toString(zookeeperSyncTimeMillis));
- props.setProperty(WAIT_ON_EMPTY_FETCH_KEY, Integer.toString(waitOnEmptyFetchMillis));
- props.setProperty("zookeeper.connect", zookeeperAddress);
- props.setProperty("group.id", "flink-persistent-kafka-source");
- return new ConsumerConfig(props);
- }
-
- /**
- * Creates a persistent Kafka source that consumes a topic.
- * If there is are no new messages on the topic, this consumer will wait
- * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
- *
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param startOffsetType
- * The offset to start from (beginning or current).
- * @param consumerConfig
- * Additional configuration for the PersistentKafkaSource.
- * NOTE: This source will only respect certain configuration values from the config!
- */
- public PersistentKafkaSource(String topicId, DeserializationSchema<OUT> deserializationSchema, Offset startOffsetType, ConsumerConfig consumerConfig) {
- super(deserializationSchema);
- Preconditions.checkNotNull(topicId, "The topic id can not be null");
- Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
- Preconditions.checkNotNull(consumerConfig, "ConsumerConfig can not be null");
-
- this.consumerConfig = consumerConfig;
-
- this.topicId = topicId;
-
- switch (startOffsetType) {
- case FROM_BEGINNING:
- this.startingOffset = new BeginningOffset();
- break;
- case FROM_CURRENT:
- this.startingOffset = new CurrentOffset();
- break;
- default:
- this.startingOffset = new CurrentOffset();
- break;
- }
- }
-
- // ---------------------- Source lifecycle methods (open / run / cancel ) -----------------
-
- @SuppressWarnings("unchecked")
- @Override
- public void open(Configuration parameters) throws InterruptedException {
- LOG.info("Starting PersistentKafkaSource");
- StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- int indexOfSubtask = context.getIndexOfThisSubtask();
- int numberOfSubtasks = context.getNumberOfParallelSubtasks();
-
- KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(consumerConfig.zkConnect(), consumerConfig.zkSyncTimeMs(), consumerConfig.zkConnectionTimeoutMs());
-
- int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
-
- if (indexOfSubtask >= numberOfPartitions) {
- LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
- iterator = new KafkaIdleConsumerIterator();
- }
- else {
- if (partitionOffsets != null) {
- // we have restored state
- LOG.info("Initializing PersistentKafkaSource from existing state.");
- }
- else {
- LOG.info("No existing state found. Creating new");
- partitionOffsets = new HashMap<Integer, KafkaOffset>();
-
- for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
- partitionOffsets.put(partitionIndex, startingOffset);
- }
-
- kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
- }
-
- iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("PersistentKafkaSource ({}/{}) listening to partitionOffsets {} of topic {}.",
- indexOfSubtask + 1, numberOfSubtasks, partitionOffsets.keySet(), topicId);
- }
- }
-
- iterator.initialize();
- }
-
- @Override
- public void run(Collector<OUT> collector) throws Exception {
- MessageWithMetadata msg;
- while (iterator.hasNext()) {
- msg = iterator.nextWithOffset();
- OUT out = schema.deserialize(msg.getMessage());
-
- if (schema.isEndOfStream(out)) {
- break;
- }
-
- collector.collect(out);
-
- // TODO avoid object creation
- partitionOffsets.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
- kafkaOffSetOperatorState.update(partitionOffsets);
- }
- }
-
- @Override
- public void cancel() {
- LOG.info("PersistentKafkaSource has been cancelled");
- }
-
-
-
- // ---------------------- (Java)Serialization methods for the consumerConfig -----------------
-
- private void writeObject(ObjectOutputStream out)
- throws IOException, ClassNotFoundException {
- out.defaultWriteObject();
- out.writeObject(consumerConfig.props().props());
- }
-
- private void readObject(ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- Properties props = (Properties) in.readObject();
- consumerConfig = new ConsumerConfig(props);
- }
-
- @Override
- public HashMap<Integer, KafkaOffset> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return new HashMap<Integer, KafkaOffset>(this.partitionOffsets);
- }
-
- @Override
- public void restoreState(HashMap<Integer, KafkaOffset> state) {
- this.partitionOffsets = state;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
deleted file mode 100644
index 42fe003..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-
-/**
- * Iterator interface for different types of Kafka consumers.
- */
-public interface KafkaConsumerIterator {
-
- void initialize() throws InterruptedException;
-
- boolean hasNext();
-
- /**
- * Returns the next message received from Kafka as a
- * byte array.
- *
- * @return next message as a byte array.
- */
- byte[] next() throws InterruptedException;
-
- /**
- * Returns the next message and its offset received from
- * Kafka encapsulated in a POJO.
- *
- * @return next message and its offset.
- */
- MessageWithMetadata nextWithOffset() throws InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
deleted file mode 100644
index 1935118..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * No-op iterator. Used when more source tasks are available than Kafka partitions
- */
-public class KafkaIdleConsumerIterator implements KafkaConsumerIterator {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class);
-
- public KafkaIdleConsumerIterator() {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Idle Kafka consumer created. The subtask does nothing.");
- }
- }
-
-
- @Override
- public void initialize() throws InterruptedException {
-
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public byte[] next() {
- throw new RuntimeException("Idle consumer has no input.");
- }
-
- @Override
- public MessageWithMetadata nextWithOffset() {
- throw new RuntimeException("Idle consumer has no input.");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/54e95761/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
deleted file mode 100644
index daebaaf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import kafka.consumer.ConsumerConfig;
-import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
-import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
-import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Iterator over multiple Kafka partitions.
- *
- * This is needed when num partitions > num kafka sources.
- */
-public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
-
- protected List<KafkaSinglePartitionIterator> partitions;
- protected final ConsumerConfig consumerConfig;
-
- public KafkaMultiplePartitionsIterator(String topic,
- Map<Integer, KafkaOffset> partitionsWithOffset,
- KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
- partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
-
- this.consumerConfig = consumerConfig;
-
- for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
- partitions.add(new KafkaSinglePartitionIterator(
- topic,
- partitionWithOffset.getKey(),
- partitionWithOffset.getValue(),
- kafkaTopicUtils,
- this.consumerConfig));
- }
- }
-
- @Override
- public void initialize() throws InterruptedException {
- LOG.info("Initializing iterator with {} partitions", partitions.size());
- String partInfo = "";
- for (KafkaSinglePartitionIterator partition : partitions) {
- partition.initialize();
- partInfo += partition.toString() + " ";
- }
- LOG.info("Initialized partitions {}", partInfo);
- }
-
- @Override
- public boolean hasNext() {
- return true;
- }
-
- @Override
- public byte[] next() throws InterruptedException {
- return nextWithOffset().getMessage();
- }
-
- protected int lastCheckedPartitionIndex = -1;
- private boolean gotNewMessage = false;
-
- @Override
- public MessageWithMetadata nextWithOffset() throws InterruptedException {
- KafkaSinglePartitionIterator partition;
-
- while (true) {
- for (int i = nextPartition(lastCheckedPartitionIndex); i < partitions.size(); i = nextPartition(i)) {
- partition = partitions.get(i);
-
- if (partition.fetchHasNext()) {
- gotNewMessage = true;
- lastCheckedPartitionIndex = i;
- return partition.nextWithOffset();
- }
- }
-
- // do not wait if a new message has been fetched
- if (!gotNewMessage) {
- try {
- Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY), consumerConfig.fetchWaitMaxMs());
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for new messages", e);
- }
- }
-
- gotNewMessage = false;
- }
- }
-
- protected int nextPartition(int currentPartition) {
- return (currentPartition + 1) % partitions.size();
- }
-}