You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/08/01 03:32:04 UTC

[incubator-openwhisk] branch master updated: Bump kafka-clients version to 2.0.0. (#3913)

This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d7e9ce6  Bump kafka-clients version to 2.0.0. (#3913)
d7e9ce6 is described below

commit d7e9ce67b552fbb24ad55aa074178d5485ccd546
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Aug 1 05:31:59 2018 +0200

    Bump kafka-clients version to 2.0.0. (#3913)
    
    * Bump kafka-clients version to 2.0.0.
    
    * Lower the request timeout again.
    
    Relevant bit from the kafka documentation:
    
    > Also as part of KIP-266, the default value of request.timeout.ms has been changed to 30 seconds. The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from max.poll.interval.ms for the request timeout. All other request types use the timeout defined by request.timeout.ms
    
    * Disable hostname verification.
---
 common/scala/build.gradle                          |  2 +-
 common/scala/src/main/resources/application.conf   |  5 ++--
 .../connector/kafka/KafkaConsumerConnector.scala   |  3 ++-
 .../src/main/scala/whisk/utils/TimeHelpers.scala   | 28 ++++++++++++++++++++++
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 5edea10..c4ae763 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
     compile 'commons-codec:commons-codec:1.9'
     compile 'commons-io:commons-io:2.6'
     compile 'commons-collections:commons-collections:3.2.2'
-    compile 'org.apache.kafka:kafka-clients:0.11.0.1'
+    compile 'org.apache.kafka:kafka-clients:2.0.0'
     compile ('org.apache.httpcomponents:httpclient:4.5.5') {
         exclude group: 'commons-logging'
     }
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0c0e4e4..b12c62e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -60,6 +60,7 @@ whisk {
 
         common {
             security-protocol = PLAINTEXT
+            ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default
         }
         producer {
             acks = 1
@@ -72,11 +73,9 @@ whisk {
             heartbeat-interval-ms = 10000
             enable-auto-commit = false
             auto-offset-reset = earliest
+            request-timeout-ms = 30000
 
-            // request-timeout-ms always needs to be larger than max-poll-interval-ms per
-            // https://kafka.apache.org/documentation/#upgrade_1010_notable
             max-poll-interval-ms = 1800000 // 30 minutes
-            request-timeout-ms = 1860000 // 31 minutes
 
             // This value controls the server-side wait time which affects polling latency.
             // A low value improves latency performance but it is important to not set it too low
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 7e28f15..bb05720 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -27,6 +27,7 @@ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
 import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
+import whisk.utils.TimeHelpers._
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -73,7 +74,7 @@ class KafkaConsumerConnector(
     }
 
     try {
-      val response = consumer.poll(duration.toMillis).asScala
+      val response = consumer.poll(duration).asScala
 
       // Cancel the scheduled wake-up task immediately.
       wakeUpTask.cancel()
diff --git a/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala b/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala
new file mode 100644
index 0000000..ff3df51
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.utils
+
+import java.time.{Duration => JDuration}
+
+import scala.concurrent.duration.{Duration => SDuration}
+import scala.language.implicitConversions
+
+object TimeHelpers {
+  implicit def toJavaDuration(d: SDuration): JDuration = JDuration.ofNanos(d.toNanos)
+  implicit def toScalaDuration(d: JDuration): SDuration = SDuration.fromNanos(d.toNanos)
+}