You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/03 00:53:18 UTC

kafka git commit: MINOR: update producer client request timeout in system test

Repository: kafka
Updated Branches:
  refs/heads/trunk b6765e46c -> e4208b1d5


MINOR: update producer client request timeout in system test

Author: Bill Bejeck <bi...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #4168 from bbejeck/MINOR_update_streams_produer_timeout_in_system_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4208b1d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4208b1d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4208b1d

Branch: refs/heads/trunk
Commit: e4208b1d5fa1c28ac7e64e2cb039404a14084dc0
Parents: b6765e4
Author: Bill Bejeck <bi...@confluent.io>
Authored: Thu Nov 2 17:53:15 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 2 17:53:15 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/perf/SimpleBenchmark.java     | 2 ++
 .../test/java/org/apache/kafka/streams/perf/YahooBenchmark.java | 5 +++++
 .../org/apache/kafka/streams/tests/BrokerCompatibilityTest.java | 2 ++
 .../test/java/org/apache/kafka/streams/tests/EosTestClient.java | 3 +++
 .../java/org/apache/kafka/streams/tests/SmokeTestClient.java    | 4 ++--
 5 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index d1e3206..592c0e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -232,6 +232,8 @@ public class SimpleBenchmark {
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
         props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+        //TODO remove this config or set to smaller value when KIP-91 is merged
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index d98fd7f..9490101 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.perf;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -28,6 +30,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -195,6 +198,8 @@ public class YahooBenchmark {
 
         CountDownLatch latch = new CountDownLatch(1);
         Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
+        //TODO remove this config or set to smaller value when KIP-91 is merged
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
         final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic, latch, parent.numRecords);
         parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 3513895..ca7620d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -68,6 +68,8 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
         streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1);
+        //TODO remove this config or set to smaller value when KIP-91 is merged
+        streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
 
         final StreamsBuilder builder = new StreamsBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 1c11be4..2bf90d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.tests;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
@@ -125,6 +126,8 @@ public class EosTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        //TODO remove this config or set to smaller value when KIP-91 is merged
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
 
         final StreamsBuilder builder = new StreamsBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 4b75702..b4ed127 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -104,6 +104,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         props.put(ProducerConfig.ACKS_CONFIG, "all");
+        //TODO remove this config or set to smaller value when KIP-91 is merged
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
 
         StreamsBuilder builder = new StreamsBuilder();
         Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -115,7 +117,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                 return value == null || value != END;
             }
         });
-
         data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
@@ -189,7 +190,6 @@ public class SmokeTestClient extends SmokeTestUtil {
         Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
         KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
         sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
-
         // cnt
         groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
             .toStream().map(