You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/01 18:51:42 UTC

flink git commit: [FLINK-1811] Allow passing custom buffer sizes and timeouts to PersistentKafkaSource

Repository: flink
Updated Branches:
  refs/heads/master 015af177d -> 79a92a647


[FLINK-1811] Allow passing custom buffer sizes and timeouts to PersistentKafkaSource

This closes #558


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79a92a64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79a92a64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79a92a64

Branch: refs/heads/master
Commit: 79a92a6478a71c83d3f75e59bb7f9ca431b513d7
Parents: 015af17
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Apr 1 15:16:17 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Apr 1 17:57:39 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/api/KafkaSource.java       | 26 ++++++++++++++++++--
 .../kafka/api/simple/PersistentKafkaSource.java | 21 +++++++++++++++-
 .../KafkaMultiplePartitionsIterator.java        |  6 +++--
 .../iterator/KafkaSinglePartitionIterator.java  | 12 ++++++---
 4 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 1aa834d..4a6da3b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.base.Preconditions;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -52,6 +53,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	private final String zookeeperAddress;
 	private final String groupId;
 	private final String topicId;
+	private Properties customProperties;
 
 	private transient ConsumerConnector consumer;
 	private transient ConsumerIterator<byte[], byte[]> consumerIterator;
@@ -75,13 +77,24 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 *            User defined deserialization schema.
 	 * @param zookeeperSyncTimeMillis
 	 *            Synchronization time with zookeeper.
+	 * @param customProperties
+	 * 			  Custom properties for Kafka
 	 */
-	public KafkaSource(String zookeeperAddress, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+	public KafkaSource(String zookeeperAddress,
+					String topicId, String groupId,
+					DeserializationSchema<OUT> deserializationSchema,
+					long zookeeperSyncTimeMillis, Properties customProperties) {
 		super(deserializationSchema);
+		Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
+		Preconditions.checkNotNull(topicId, "Topic ID is null");
+		Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
+		Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
+
 		this.zookeeperAddress = zookeeperAddress;
 		this.groupId = groupId;
 		this.topicId = topicId;
 		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
+		this.customProperties = customProperties;
 	}
 
 	/**
@@ -97,7 +110,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 *            Synchronization time with zookeeper.
 	 */
 	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
-		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis);
+		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
 	}
 	/**
 	 * Creates a KafkaSource that consumes a topic.
@@ -124,6 +137,15 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
 		props.put("auto.commit.interval.ms", "1000");
 
+		if(customProperties != null) {
+			for(Map.Entry<Object, Object> e : props.entrySet()) {
+				if(props.contains(e.getKey())) {
+					LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
+				}
+				props.put(e.getKey(), e.getValue());
+			}
+		}
+
 		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
 
 		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 97225dc..5ca6a58 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.api.simple;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
@@ -57,6 +58,9 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 	private final int waitOnEmptyFetchMillis;
 	private final KafkaOffset startingOffset;
 
+	private int connectTimeoutMs = 100000;
+	private int bufferSize = 64 * 1024;
+
 	private transient KafkaConsumerIterator iterator;
 	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
 
@@ -124,6 +128,11 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 			DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
 			int waitOnEmptyFetchMillis, Offset startOffsetType) {
 		super(deserializationSchema);
+		Preconditions.checkNotNull(zookeeperAddress, "The Zookeeper address can not be null");
+		Preconditions.checkNotNull(topicId, "The topic id can not be null");
+		Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
+		Preconditions.checkArgument(zookeeperSyncTimeMillis > 0, "The sync time must be positive");
+		Preconditions.checkArgument(waitOnEmptyFetchMillis > 0, "The wait time must be positive");
 
 		this.topicId = topicId;
 		this.zookeeperServerAddress = zookeeperAddress;
@@ -189,7 +198,7 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 	}
 
 	protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
-		return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch);
+		return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch, this.connectTimeoutMs, this.bufferSize);
 	}
 
 	@Override
@@ -211,6 +220,16 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		}
 	}
 
+	public void setConnectTimeoutMs(int connectTimeoutMs) {
+		Preconditions.checkArgument(connectTimeoutMs > 0, "The timeout must be positive");
+		this.connectTimeoutMs = connectTimeoutMs;
+	}
+
+	public void setBufferSize(int bufferSize) {
+		Preconditions.checkArgument(connectTimeoutMs > 0, "The buffer size must be positive");
+		this.bufferSize = bufferSize;
+	}
+
 	@Override
 	public void cancel() {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index 420c6db..40e1ff2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -33,7 +33,9 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 	protected List<KafkaSinglePartitionIterator> partitions;
 	protected final int waitOnEmptyFetch;
 
-	public KafkaMultiplePartitionsIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
+	public KafkaMultiplePartitionsIterator(String hostName, String topic,
+										Map<Integer, KafkaOffset> partitionsWithOffset,
+										int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
 		partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
 
 		String[] hostAndPort = hostName.split(":");
@@ -49,7 +51,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 					port,
 					topic,
 					partitionWithOffset.getKey(),
-					partitionWithOffset.getValue()));
+					partitionWithOffset.getValue(), connectTimeoutMs, bufferSize));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79a92a64/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 4c1468a..cf49e43 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -60,6 +60,8 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	private List<String> replicaBrokers;
 	private String clientName;
 	private String leadBroker;
+	private final int connectTimeoutMs;
+	private final int bufferSize;
 
 	private KafkaOffset initialOffset;
 	private transient Iterator<MessageAndOffset> iter;
@@ -74,11 +76,13 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 * @param topic Name of the topic to listen to
 	 * @param partition Partition in the chosen topic
 	 */
-	public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset) {
+	public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset,
+										int connectTimeoutMs, int bufferSize) {
 		this.hosts = new ArrayList<String>();
 		hosts.add(hostName);
 		this.port = port;
-
+		this.connectTimeoutMs = connectTimeoutMs;
+		this.bufferSize = bufferSize;
 		this.topic = topic;
 		this.partition = partition;
 
@@ -114,7 +118,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		leadBroker = metadata.leader().host();
 		clientName = "Client_" + topic + "_" + partition;
 
-		consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+		consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName);
 
 		readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
 
@@ -236,7 +240,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		for (String seed : a_hosts) {
 			SimpleConsumer consumer = null;
 			try {
-				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
+				consumer = new SimpleConsumer(seed, a_port, connectTimeoutMs, bufferSize, "leaderLookup");
 				List<String> topics = Collections.singletonList(a_topic);
 				TopicMetadataRequest req = new TopicMetadataRequest(topics);
 				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);