You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/06 21:40:20 UTC
[incubator-pinot] branch master updated: add kafka simple consumer
buffer and timeout to stream config (#3584)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ea628a9 add kafka simple consumer buffer and timeout to stream config (#3584)
ea628a9 is described below
commit ea628a93cc1bb7d7b0034e40460ba8b8e05bc709
Author: James Shao <ja...@users.noreply.github.com>
AuthorDate: Thu Dec 6 13:40:13 2018 -0800
add kafka simple consumer buffer and timeout to stream config (#3584)
* add kafka simple consumer buffer and timeout to stream config
* add missing header, optimize imports
* update based on feedback
---
.../impl/kafka/KafkaConnectionHandler.java | 22 +++--
.../impl/kafka/KafkaLowLevelStreamConfig.java | 48 +++++++++-
.../impl/kafka/KafkaStreamConfigProperties.java | 4 +
.../impl/kafka/KafkaLowLevelStreamConfigTest.java | 100 +++++++++++++++++++++
.../kafka/KafkaPartitionLevelConsumerTest.java | 50 +++++++++++
5 files changed, 214 insertions(+), 10 deletions(-)
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index b47caf9..d7f6d1f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.core.realtime.impl.kafka;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -42,8 +43,6 @@ import org.slf4j.LoggerFactory;
*/
public class KafkaConnectionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectionHandler.class);
- private static final int SOCKET_TIMEOUT_MILLIS = 10000;
- private static final int SOCKET_BUFFER_SIZE = 512000;
enum ConsumerState {
CONNECTING_TO_BOOTSTRAP_NODE,
@@ -65,6 +64,8 @@ public class KafkaConnectionHandler {
KafkaBrokerWrapper _leader;
String _currentHost;
int _currentPort;
+ int _bufferSize;
+ int _socketTimeout;
final KafkaSimpleConsumerFactory _simpleConsumerFactory;
SimpleConsumer _simpleConsumer;
@@ -73,6 +74,11 @@ public class KafkaConnectionHandler {
boolean isPartitionProvided;
+ @VisibleForTesting
+ public SimpleConsumer getSimpleConsumer() {
+ return _simpleConsumer;
+ }
+
/**
* A Kafka protocol error that indicates a situation that is not likely to clear up by retrying the request (for
* example, no such topic or offset out of range).
@@ -110,6 +116,8 @@ public class KafkaConnectionHandler {
isPartitionProvided = false;
_partition = Integer.MIN_VALUE;
+ _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -133,6 +141,8 @@ public class KafkaConnectionHandler {
isPartitionProvided = true;
_partition = partition;
+ _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -216,8 +226,8 @@ public class KafkaConnectionHandler {
try {
LOGGER.info("Connecting to bootstrap host {}:{} for topic {}", _currentHost, _currentPort, _topic);
- _simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, SOCKET_TIMEOUT_MILLIS,
- SOCKET_BUFFER_SIZE, _clientId);
+ _simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, _socketTimeout,
+ _bufferSize, _clientId);
setCurrentState(new ConnectedToBootstrapNode());
} catch (Exception e) {
handleConsumerException(e);
@@ -326,8 +336,8 @@ public class KafkaConnectionHandler {
// Connect to the partition leader
try {
_simpleConsumer =
- _simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), SOCKET_TIMEOUT_MILLIS,
- SOCKET_BUFFER_SIZE, _clientId);
+ _simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), _socketTimeout,
+ _bufferSize, _clientId);
setCurrentState(new ConnectedToPartitionLeader());
} catch (Exception e) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index de63f5a..1b3a6c8 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -18,6 +18,8 @@ package com.linkedin.pinot.core.realtime.impl.kafka;
import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.utils.EqualityUtils;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import org.apache.commons.lang.StringUtils;
+
import java.util.Map;
@@ -28,6 +30,8 @@ public class KafkaLowLevelStreamConfig {
private String _kafkaTopicName;
private String _bootstrapHosts;
+ private int _kafkaBufferSize;
+ private int _kafkaSocketTimeout;
/**
* Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
@@ -40,7 +44,15 @@ public class KafkaLowLevelStreamConfig {
String llcBrokerListKey =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+ String llcBufferKey =
+ KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+ String llcTimeoutKey =
+ KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
_bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
+ _kafkaBufferSize = getIntConfigWithDefault(streamConfigMap, llcBufferKey,
+ KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+ _kafkaSocketTimeout = getIntConfigWithDefault(streamConfigMap, llcTimeoutKey,
+ KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
Preconditions.checkNotNull(_bootstrapHosts,
"Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
}
@@ -53,12 +65,36 @@ public class KafkaLowLevelStreamConfig {
return _bootstrapHosts;
}
+ public int getKafkaBufferSize() {
+ return _kafkaBufferSize;
+ }
+
+ public int getKafkaSocketTimeout() {
+ return _kafkaSocketTimeout;
+ }
+
+ private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+ String stringValue = configMap.get(key);
+ try {
+ if (StringUtils.isNotEmpty(stringValue)) {
+ return Integer.parseInt(stringValue);
+ }
+ return defaultValue;
+ } catch (NumberFormatException ex) {
+ return defaultValue;
+ }
+ }
+
@Override
public String toString() {
- return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
- + _bootstrapHosts + '\'' + '}';
+ return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\''
+ + ", _bootstrapHosts='" + _bootstrapHosts + '\''
+ + ", _kafkaBufferSize='" + _kafkaBufferSize + '\''
+ + ", _kafkaSocketTimeout='" + _kafkaSocketTimeout + '\''
+ + '}';
}
+
@Override
public boolean equals(Object o) {
if (EqualityUtils.isSameReference(this, o)) {
@@ -71,14 +107,18 @@ public class KafkaLowLevelStreamConfig {
KafkaLowLevelStreamConfig that = (KafkaLowLevelStreamConfig) o;
- return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils.isEqual(_bootstrapHosts,
- that._bootstrapHosts);
+ return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName)
+ && EqualityUtils.isEqual(_bootstrapHosts, that._bootstrapHosts)
+ && EqualityUtils.isEqual(_kafkaBufferSize, that._kafkaBufferSize)
+ && EqualityUtils.isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout);
}
@Override
public int hashCode() {
int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+ result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+ result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
return result;
}
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index cc5bcf0..be62ec5 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -39,6 +39,10 @@ public class KafkaStreamConfigProperties {
public static class LowLevelConsumer {
public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+ public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
+ public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
+ public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
+ public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
}
public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
new file mode 100644
index 0000000..289e140
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.realtime.impl.kafka;
+
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaLowLevelStreamConfigTest {
+
+ private KafkaLowLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, String socketTimeout) {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ String streamType = "kafka";
+ String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+ String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
+ String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClassName);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
+ streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+ if (buffer != null) {
+ streamConfigMap.put("stream.kafka.buffer.size", buffer);
+ }
+ if (socketTimeout != null) {
+ streamConfigMap.put("stream.kafka.socket.timeout", String.valueOf(socketTimeout));
+ }
+ return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
+ }
+
+ @Test
+ public void testGetKafkaTopicName() {
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+ Assert.assertEquals("topic", config.getKafkaTopicName());
+ }
+
+ @Test
+ public void testGetBootstrapHosts() {
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
+ Assert.assertEquals("host1", config.getBootstrapHosts());
+ }
+
+ @Test
+ public void testGetKafkaBufferSize() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "", "");
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "bad value", "");
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "");
+ Assert.assertEquals(100, config.getKafkaBufferSize());
+ }
+
+ @Test
+ public void testGetKafkaSocketTimeout() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",null);
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "","");
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "","bad value");
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "100");
+ Assert.assertEquals(100, config.getKafkaSocketTimeout());
+ }
+}
\ No newline at end of file
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
index 73adca5..4316008 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.core.realtime.kafka;
import com.google.common.base.Preconditions;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
+import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
@@ -209,6 +210,55 @@ public class KafkaPartitionLevelConsumerTest {
}
@Test
+ public void testBuildConsumer() throws Exception {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = "abcd:1234,bcde:2345";
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(
+ new String[] { "abcd", "bcde" },
+ new int[] { 1234, 2345 },
+ new long[] { 12345L, 23456L },
+ new long[] { 23456L, 34567L },
+ new int[] { 0, 1 },
+ streamKafkaTopicName
+ );
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider =
+ new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
+
+ // test default value
+ KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
+ new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+ Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+
+ // test user defined values
+ streamConfigMap.put("stream.kafka.buffer.size", "100");
+ streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+ streamConfig = new StreamConfig(streamConfigMap);
+ kafkaSimpleStreamConsumer =
+ new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+ Assert.assertEquals(100, kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+ Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+ }
+
+ @Test
public void testGetPartitionCount() {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org