You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/06 21:40:20 UTC

[incubator-pinot] branch master updated: add kafka simple consumer buffer and timeout to stream config (#3584)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ea628a9  add kafka simple consumer buffer and timeout to stream config (#3584)
ea628a9 is described below

commit ea628a93cc1bb7d7b0034e40460ba8b8e05bc709
Author: James Shao <ja...@users.noreply.github.com>
AuthorDate: Thu Dec 6 13:40:13 2018 -0800

    add kafka simple consumer buffer and timeout to stream config (#3584)
    
    * add kafka simple consumer buffer and timeout to stream config
    
    * add missing header, optimize imports
    
    * update based on feedback
---
 .../impl/kafka/KafkaConnectionHandler.java         |  22 +++--
 .../impl/kafka/KafkaLowLevelStreamConfig.java      |  48 +++++++++-
 .../impl/kafka/KafkaStreamConfigProperties.java    |   4 +
 .../impl/kafka/KafkaLowLevelStreamConfigTest.java  | 100 +++++++++++++++++++++
 .../kafka/KafkaPartitionLevelConsumerTest.java     |  50 +++++++++++
 5 files changed, 214 insertions(+), 10 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index b47caf9..d7f6d1f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.core.realtime.impl.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -42,8 +43,6 @@ import org.slf4j.LoggerFactory;
  */
 public class KafkaConnectionHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectionHandler.class);
-  private static final int SOCKET_TIMEOUT_MILLIS = 10000;
-  private static final int SOCKET_BUFFER_SIZE = 512000;
 
   enum ConsumerState {
     CONNECTING_TO_BOOTSTRAP_NODE,
@@ -65,6 +64,8 @@ public class KafkaConnectionHandler {
   KafkaBrokerWrapper _leader;
   String _currentHost;
   int _currentPort;
+  int _bufferSize;
+  int _socketTimeout;
 
   final KafkaSimpleConsumerFactory _simpleConsumerFactory;
   SimpleConsumer _simpleConsumer;
@@ -73,6 +74,11 @@ public class KafkaConnectionHandler {
 
   boolean isPartitionProvided;
 
+  @VisibleForTesting
+  public SimpleConsumer getSimpleConsumer() {
+    return _simpleConsumer;
+  }
+
   /**
    * A Kafka protocol error that indicates a situation that is not likely to clear up by retrying the request (for
    * example, no such topic or offset out of range).
@@ -110,6 +116,8 @@ public class KafkaConnectionHandler {
     isPartitionProvided = false;
     _partition = Integer.MIN_VALUE;
 
+    _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+    _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
     initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
     setCurrentState(new ConnectingToBootstrapNode());
   }
@@ -133,6 +141,8 @@ public class KafkaConnectionHandler {
     isPartitionProvided = true;
     _partition = partition;
 
+    _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+    _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
     initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
     setCurrentState(new ConnectingToBootstrapNode());
   }
@@ -216,8 +226,8 @@ public class KafkaConnectionHandler {
 
       try {
         LOGGER.info("Connecting to bootstrap host {}:{} for topic {}", _currentHost, _currentPort, _topic);
-        _simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, SOCKET_TIMEOUT_MILLIS,
-            SOCKET_BUFFER_SIZE, _clientId);
+        _simpleConsumer = _simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort, _socketTimeout,
+            _bufferSize, _clientId);
         setCurrentState(new ConnectedToBootstrapNode());
       } catch (Exception e) {
         handleConsumerException(e);
@@ -326,8 +336,8 @@ public class KafkaConnectionHandler {
       // Connect to the partition leader
       try {
         _simpleConsumer =
-            _simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), SOCKET_TIMEOUT_MILLIS,
-                SOCKET_BUFFER_SIZE, _clientId);
+            _simpleConsumerFactory.buildSimpleConsumer(_leader.host(), _leader.port(), _socketTimeout,
+                _bufferSize, _clientId);
 
         setCurrentState(new ConnectedToPartitionLeader());
       } catch (Exception e) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index de63f5a..1b3a6c8 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -18,6 +18,8 @@ package com.linkedin.pinot.core.realtime.impl.kafka;
 import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.utils.EqualityUtils;
 import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import org.apache.commons.lang.StringUtils;
+
 import java.util.Map;
 
 
@@ -28,6 +30,8 @@ public class KafkaLowLevelStreamConfig {
 
   private String _kafkaTopicName;
   private String _bootstrapHosts;
+  private int _kafkaBufferSize;
+  private int _kafkaSocketTimeout;
 
   /**
    * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
@@ -40,7 +44,15 @@ public class KafkaLowLevelStreamConfig {
 
     String llcBrokerListKey =
         KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+    String llcBufferKey =
+        KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+    String llcTimeoutKey =
+        KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
     _bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
+    _kafkaBufferSize = getIntConfigWithDefault(streamConfigMap, llcBufferKey,
+        KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+    _kafkaSocketTimeout = getIntConfigWithDefault(streamConfigMap, llcTimeoutKey,
+        KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
     Preconditions.checkNotNull(_bootstrapHosts,
         "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
   }
@@ -53,12 +65,36 @@ public class KafkaLowLevelStreamConfig {
     return _bootstrapHosts;
   }
 
+  public int getKafkaBufferSize() {
+    return _kafkaBufferSize;
+  }
+
+  public int getKafkaSocketTimeout() {
+    return _kafkaSocketTimeout;
+  }
+
+  private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+    String stringValue = configMap.get(key);
+    try {
+      if (StringUtils.isNotEmpty(stringValue)) {
+        return Integer.parseInt(stringValue);
+      }
+      return defaultValue;
+    } catch (NumberFormatException ex) {
+      return defaultValue;
+    }
+  }
+
   @Override
   public String toString() {
-    return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
-        + _bootstrapHosts + '\'' + '}';
+    return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\''
+        + ", _bootstrapHosts='" + _bootstrapHosts + '\''
+        + ", _kafkaBufferSize='" + _kafkaBufferSize + '\''
+        + ", _kafkaSocketTimeout='" + _kafkaSocketTimeout + '\''
+        + '}';
   }
 
+
   @Override
   public boolean equals(Object o) {
     if (EqualityUtils.isSameReference(this, o)) {
@@ -71,14 +107,18 @@ public class KafkaLowLevelStreamConfig {
 
     KafkaLowLevelStreamConfig that = (KafkaLowLevelStreamConfig) o;
 
-    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils.isEqual(_bootstrapHosts,
-        that._bootstrapHosts);
+    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName)
+        && EqualityUtils.isEqual(_bootstrapHosts, that._bootstrapHosts)
+        && EqualityUtils.isEqual(_kafkaBufferSize, that._kafkaBufferSize)
+        && EqualityUtils.isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout);
   }
 
   @Override
   public int hashCode() {
     int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
     result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+    result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+    result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
     return result;
   }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index cc5bcf0..be62ec5 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -39,6 +39,10 @@ public class KafkaStreamConfigProperties {
 
   public static class LowLevelConsumer {
     public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+    public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
+    public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
+    public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
+    public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
   }
 
   public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
new file mode 100644
index 0000000..289e140
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.realtime.impl.kafka;
+
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaLowLevelStreamConfigTest {
+
+  private KafkaLowLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, String socketTimeout) {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    String streamType = "kafka";
+    String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
+    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClassName);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
+    streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+    if (buffer != null) {
+      streamConfigMap.put("stream.kafka.buffer.size", buffer);
+    }
+    if (socketTimeout != null) {
+      streamConfigMap.put("stream.kafka.socket.timeout", String.valueOf(socketTimeout));
+    }
+    return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
+  }
+
+  @Test
+  public void testGetKafkaTopicName() {
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+    Assert.assertEquals("topic", config.getKafkaTopicName());
+  }
+
+  @Test
+  public void testGetBootstrapHosts() {
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals("host1", config.getBootstrapHosts());
+  }
+
+  @Test
+  public void testGetKafkaBufferSize() {
+    // test default
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "bad value", "");
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, config.getKafkaBufferSize());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "");
+    Assert.assertEquals(100, config.getKafkaBufferSize());
+  }
+
+  @Test
+  public void testGetKafkaSocketTimeout() {
+    // test default
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",null);
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "","");
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "","bad value");
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, config.getKafkaSocketTimeout());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "100");
+    Assert.assertEquals(100, config.getKafkaSocketTimeout());
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
index 73adca5..4316008 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.core.realtime.kafka;
 import com.google.common.base.Preconditions;
 import com.linkedin.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
 import com.linkedin.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
+import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
 import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
 import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
 import com.linkedin.pinot.core.realtime.stream.StreamConfig;
@@ -209,6 +210,55 @@ public class KafkaPartitionLevelConsumerTest {
   }
 
   @Test
+  public void testBuildConsumer() throws Exception {
+     String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = "abcd:1234,bcde:2345";
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new MockKafkaSimpleConsumerFactory(
+        new String[] { "abcd", "bcde" },
+        new int[] { 1234, 2345 },
+        new long[] { 12345L, 23456L },
+        new long[] { 23456L, 34567L },
+        new int[] { 0, 1 },
+        streamKafkaTopicName
+    );
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", mockKafkaSimpleConsumerFactory.getClass().getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    KafkaStreamMetadataProvider streamMetadataProvider =
+        new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
+
+    // test default value
+    KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+    Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+
+    // test user defined values
+    streamConfigMap.put("stream.kafka.buffer.size", "100");
+    streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+    streamConfig = new StreamConfig(streamConfigMap);
+    kafkaSimpleStreamConsumer =
+        new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, mockKafkaSimpleConsumerFactory);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+    Assert.assertEquals(100, kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+    Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+  }
+
+  @Test
   public void testGetPartitionCount() {
     String streamType = "kafka";
     String streamKafkaTopicName = "theTopic";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org