You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/01 18:51:42 UTC
flink git commit: [FLINK-1811] Allow passing custom buffer sizes and
timeouts to PersistentKafkaSource
Repository: flink
Updated Branches:
refs/heads/master 015af177d -> 79a92a647
[FLINK-1811] Allow passing custom buffer sizes and timeouts to PersistentKafkaSource
This closes #558
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79a92a64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79a92a64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79a92a64
Branch: refs/heads/master
Commit: 79a92a6478a71c83d3f75e59bb7f9ca431b513d7
Parents: 015af17
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Apr 1 15:16:17 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Apr 1 17:57:39 2015 +0200
----------------------------------------------------------------------
.../connectors/kafka/api/KafkaSource.java | 26 ++++++++++++++++++--
.../kafka/api/simple/PersistentKafkaSource.java | 21 +++++++++++++++-
.../KafkaMultiplePartitionsIterator.java | 6 +++--
.../iterator/KafkaSinglePartitionIterator.java | 12 ++++++---
4 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 1aa834d..4a6da3b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import com.google.common.base.Preconditions;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
@@ -52,6 +53,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private final String zookeeperAddress;
private final String groupId;
private final String topicId;
+ private Properties customProperties;
private transient ConsumerConnector consumer;
private transient ConsumerIterator<byte[], byte[]> consumerIterator;
@@ -75,13 +77,24 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
* User defined deserialization schema.
* @param zookeeperSyncTimeMillis
* Synchronization time with zookeeper.
+ * @param customProperties
+ * Custom properties for Kafka
*/
- public KafkaSource(String zookeeperAddress, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+ public KafkaSource(String zookeeperAddress,
+ String topicId, String groupId,
+ DeserializationSchema<OUT> deserializationSchema,
+ long zookeeperSyncTimeMillis, Properties customProperties) {
super(deserializationSchema);
+ Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
+ Preconditions.checkNotNull(topicId, "Topic ID is null");
+ Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
+ Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
+
this.zookeeperAddress = zookeeperAddress;
this.groupId = groupId;
this.topicId = topicId;
this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
+ this.customProperties = customProperties;
}
/**
@@ -97,7 +110,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
* Synchronization time with zookeeper.
*/
public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
- this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis);
+ this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
}
/**
* Creates a KafkaSource that consumes a topic.
@@ -124,6 +137,15 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
props.put("auto.commit.interval.ms", "1000");
+ if(customProperties != null) {
+ for(Map.Entry<Object, Object> e : props.entrySet()) {
+ if(props.contains(e.getKey())) {
+ LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
+ }
+ props.put(e.getKey(), e.getValue());
+ }
+ }
+
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 97225dc..5ca6a58 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.api.simple;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
@@ -57,6 +58,9 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
private final int waitOnEmptyFetchMillis;
private final KafkaOffset startingOffset;
+ private int connectTimeoutMs = 100000;
+ private int bufferSize = 64 * 1024;
+
private transient KafkaConsumerIterator iterator;
private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
@@ -124,6 +128,11 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
int waitOnEmptyFetchMillis, Offset startOffsetType) {
super(deserializationSchema);
+ Preconditions.checkNotNull(zookeeperAddress, "The Zookeeper address can not be null");
+ Preconditions.checkNotNull(topicId, "The topic id can not be null");
+ Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
+ Preconditions.checkArgument(zookeeperSyncTimeMillis > 0, "The sync time must be positive");
+ Preconditions.checkArgument(waitOnEmptyFetchMillis > 0, "The wait time must be positive");
this.topicId = topicId;
this.zookeeperServerAddress = zookeeperAddress;
@@ -189,7 +198,7 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
- return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch);
+ return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch, this.connectTimeoutMs, this.bufferSize);
}
@Override
@@ -211,6 +220,16 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
}
+ public void setConnectTimeoutMs(int connectTimeoutMs) {
+ Preconditions.checkArgument(connectTimeoutMs > 0, "The timeout must be positive");
+ this.connectTimeoutMs = connectTimeoutMs;
+ }
+
+ public void setBufferSize(int bufferSize) {
+ Preconditions.checkArgument(connectTimeoutMs > 0, "The buffer size must be positive");
+ this.bufferSize = bufferSize;
+ }
+
@Override
public void cancel() {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index 420c6db..40e1ff2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -33,7 +33,9 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
protected List<KafkaSinglePartitionIterator> partitions;
protected final int waitOnEmptyFetch;
- public KafkaMultiplePartitionsIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
+ public KafkaMultiplePartitionsIterator(String hostName, String topic,
+ Map<Integer, KafkaOffset> partitionsWithOffset,
+ int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
String[] hostAndPort = hostName.split(":");
@@ -49,7 +51,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
port,
topic,
partitionWithOffset.getKey(),
- partitionWithOffset.getValue()));
+ partitionWithOffset.getValue(), connectTimeoutMs, bufferSize));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 4c1468a..cf49e43 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -60,6 +60,8 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private List<String> replicaBrokers;
private String clientName;
private String leadBroker;
+ private final int connectTimeoutMs;
+ private final int bufferSize;
private KafkaOffset initialOffset;
private transient Iterator<MessageAndOffset> iter;
@@ -74,11 +76,13 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
* @param topic Name of the topic to listen to
* @param partition Partition in the chosen topic
*/
- public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset) {
+ public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset,
+ int connectTimeoutMs, int bufferSize) {
this.hosts = new ArrayList<String>();
hosts.add(hostName);
this.port = port;
-
+ this.connectTimeoutMs = connectTimeoutMs;
+ this.bufferSize = bufferSize;
this.topic = topic;
this.partition = partition;
@@ -114,7 +118,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
leadBroker = metadata.leader().host();
clientName = "Client_" + topic + "_" + partition;
- consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+ consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName);
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
@@ -236,7 +240,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
for (String seed : a_hosts) {
SimpleConsumer consumer = null;
try {
- consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
+ consumer = new SimpleConsumer(seed, a_port, connectTimeoutMs, bufferSize, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);