You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by du...@apache.org on 2018/08/01 03:32:04 UTC
[incubator-openwhisk] branch master updated: Bump kafka-clients
version to 2.0.0. (#3913)
This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d7e9ce6 Bump kafka-clients version to 2.0.0. (#3913)
d7e9ce6 is described below
commit d7e9ce67b552fbb24ad55aa074178d5485ccd546
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Wed Aug 1 05:31:59 2018 +0200
Bump kafka-clients version to 2.0.0. (#3913)
* Bump kafka-clients version to 2.0.0.
* Lower the request timeout again.
Relevant bit from the kafka documentation:
> Also as part of KIP-266, the default value of request.timeout.ms has been changed to 30 seconds. The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from max.poll.interval.ms for the request timeout. All other request types use the timeout defined by request.timeout.ms
* Disable hostname verification.
---
common/scala/build.gradle | 2 +-
common/scala/src/main/resources/application.conf | 5 ++--
.../connector/kafka/KafkaConsumerConnector.scala | 3 ++-
.../src/main/scala/whisk/utils/TimeHelpers.scala | 28 ++++++++++++++++++++++
4 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 5edea10..c4ae763 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
compile 'commons-codec:commons-codec:1.9'
compile 'commons-io:commons-io:2.6'
compile 'commons-collections:commons-collections:3.2.2'
- compile 'org.apache.kafka:kafka-clients:0.11.0.1'
+ compile 'org.apache.kafka:kafka-clients:2.0.0'
compile ('org.apache.httpcomponents:httpclient:4.5.5') {
exclude group: 'commons-logging'
}
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 0c0e4e4..b12c62e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -60,6 +60,7 @@ whisk {
common {
security-protocol = PLAINTEXT
+ ssl-endpoint-identification-algorithm = "" // restores pre-kafka 2.0.0 default
}
producer {
acks = 1
@@ -72,11 +73,9 @@ whisk {
heartbeat-interval-ms = 10000
enable-auto-commit = false
auto-offset-reset = earliest
+ request-timeout-ms = 30000
- // request-timeout-ms always needs to be larger than max-poll-interval-ms per
- // https://kafka.apache.org/documentation/#upgrade_1010_notable
max-poll-interval-ms = 1800000 // 30 minutes
- request-timeout-ms = 1860000 // 31 minutes
// This value controls the server-side wait time which affects polling latency.
// A low value improves latency performance but it is important to not set it too low
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 7e28f15..bb05720 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -27,6 +27,7 @@ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
import whisk.connector.kafka.KafkaConfiguration._
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
+import whisk.utils.TimeHelpers._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
@@ -73,7 +74,7 @@ class KafkaConsumerConnector(
}
try {
- val response = consumer.poll(duration.toMillis).asScala
+ val response = consumer.poll(duration).asScala
// Cancel the scheduled wake-up task immediately.
wakeUpTask.cancel()
diff --git a/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala b/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala
new file mode 100644
index 0000000..ff3df51
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/utils/TimeHelpers.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.utils
+
+import java.time.{Duration => JDuration}
+
+import scala.concurrent.duration.{Duration => SDuration}
+import scala.language.implicitConversions
+
+object TimeHelpers {
+ implicit def toJavaDuration(d: SDuration): JDuration = JDuration.ofNanos(d.toNanos)
+ implicit def toScalaDuration(d: JDuration): SDuration = SDuration.fromNanos(d.toNanos)
+}