You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/08 15:51:46 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

vvcephei commented on a change in pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#discussion_r451634562



##########
File path: build.gradle
##########
@@ -1508,6 +1508,18 @@ project(':streams:upgrade-system-tests-24') {
   }
 }
 
+project(':streams:upgrade-system-tests-25') {

Review comment:
       I realized this just now. We really should test upgrading from 2.5.0 to 2.5.1, and similar for every bugfix release. In other words, we should add/update "LATEST_X_Y" to the test matrix of the X.Y branch after each release.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##########
@@ -92,6 +94,13 @@ public void shouldWorkWithRebalance() throws InterruptedException {
 
         final Properties props = new Properties();
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");

Review comment:
       I'm not sure exactly what it was, but the integration test was failing after the (backported) change to move the property definitions into python. I'll send a new PR to trunk to add these configs to the integration test, just in case not having them is a source of flakiness.

##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
       Note, in 2.5, I was able to add 2.0 and 2.1 to the upgrade matrix. This means we did break upgradability from anything 2.1- to 2.6+ in the 2.6 release. People would have to upgrade to 2.5 first and then upgrade to 2.6.
   
   Should we block the release and fix this for 2.6.0?

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -45,21 +47,48 @@
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
+                        LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {

Review comment:
       I had to take this out to make the upgrade test pass. The problem was that after upgrading, not all the instances would get a task for the input topic (`data`). When an instance has only to process repartition topics, it doesn't have very high throughput, and wouldn't even see 100 records in a partition before the test timed out and failed, since it never saw the "processed" message in stdout.
   
   This is mildly concerning, but since this is a new mode of operation for this test, and since people have been complaining for a while about skewed assignments in 2.5 and before, I think we can just accept it in older branches.
   
   To clarify, by removing this condition, we print to stdout on every record we process, so when an instance only has the `min` repartition topic, for example, it'll still print something, even if it only gets two records to process within the time limit.

##########
File path: streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+    public static final int ONE_DAY = 24 * 60 * 60 * 1000;
+    public static final long TWO_DAYS = 2L * ONE_DAY;
+    private final String name;
+
+    private KafkaStreams streams;
+    private boolean uncaughtException = false;
+    private boolean started;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
+    }
+
+    public boolean started() {
+        return started;
+    }
+
+    public boolean closed() {
+        return closed;
+    }
+
+    public void start(final Properties streamsProperties) {
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
+            uncaughtException = true;
+            streams.close(30, TimeUnit.SECONDS);
+        });
+
+        addShutdownHook("streams-shutdown-hook", this::close);
+
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
+    }
+
+    public void close() {
+        final boolean closed = streams.close(1, TimeUnit.MINUTES);
+
+        if (closed && !uncaughtException) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
+        } else if (closed) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
+        }
+    }
+
+    private Properties getStreamsConfig(final Properties props) {
+        final Properties fullProps = new Properties(props);
+        fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
+        fullProps.putAll(props);
+        return fullProps;
+    }
+
+    public Topology getTopology() {

Review comment:
       For 2.0, I had to make a few changes to the app:
   1. Convert the "Duration" APIs to the equivalent "long" or "TimeUnit" APIs in 2.0
   2. Drop the suppress/grace/retention features. This technically breaks the application upgrade, because of the topology numbering clashes, but since we're not verifying the results, the tests pass anyway. Realistically, a real person would do the upgrade first and then add suppression in a separate upgrade, in conjunction with an app reset. Or, following the upgrade guide, they would name the suppression nodes. I'd like to defer simulating this until we add validation to this test. Since it passes right now anyway, there's really no way to tell if any further compatibility work I might do now is actually functioning properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org