You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/22 00:28:51 UTC

[kafka] branch trunk updated: MINOR: Ignore test_broker_type_bounce_at_start system test (#5055)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70a506b  MINOR: Ignore test_broker_type_bounce_at_start system test (#5055)
70a506b is described below

commit 70a506b9839605304a0a67b70e711a1139b47cbe
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon May 21 17:28:40 2018 -0700

    MINOR: Ignore test_broker_type_bounce_at_start system test (#5055)
    
    test_broker_type_bounce_at_start tries to validate that when the controller is down, the streams client will always fail trying to create the topic; with the current behavior of admin client it is actually not always true: the actual behavior depends on the admin client internals as well as when the controller becomes unavailable during the leader assign partitions phase. I'd suggest at least ignore this test for now until the admin client has more stable (personally I'd even suggest  [...]
    
    Also adding a few more log4j entries as a result of investigating this issue.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java   | 5 ++++-
 .../kafka/streams/processor/internals/InternalTopicManager.java      | 4 ++++
 .../kafka/streams/processor/internals/StreamsPartitionAssignor.java  | 5 ++---
 tests/kafkatest/tests/streams/streams_broker_bounce_test.py          | 1 +
 4 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5f4eefe..00f543c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -577,7 +577,9 @@ public class KafkaAdminClient extends AdminClient {
             // this RPC. That is why 'tries' is not incremented.
             if ((throwable instanceof UnsupportedVersionException) &&
                      handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
-                log.trace("{} attempting protocol downgrade.", this);
+                if (log.isDebugEnabled()) {
+                    log.debug("{} attempting protocol downgrade and then retry.", this);
+                }
                 runnable.enqueue(this, now);
                 return;
             }
@@ -813,6 +815,7 @@ public class KafkaAdminClient extends AdminClient {
          * @param pendingIter   An iterator yielding pending calls.
          */
         private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
+            log.trace("Trying to choose nodes for {} at {}", pendingIter, now);
             while (pendingIter.hasNext()) {
                 Call call = pendingIter.next();
                 Node node = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2ac37bd..2c2df04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -108,6 +108,7 @@ public class InternalTopicManager {
                     .configs(topicConfig));
             }
 
+            // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client
             int remainingRetries = retries;
             boolean retry;
             do {
@@ -171,6 +172,9 @@ public class InternalTopicManager {
      */
     // visible for testing
     protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
+        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
+
+        // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client
         int remainingRetries = retries;
         boolean retry;
         do {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d7a9b33..e1464e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -718,9 +718,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      *
      * @param topicPartitions Map that contains the topic names to be created with the number of partitions
      */
-    @SuppressWarnings("deprecation")
     private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
-        log.debug("Starting to validate internal topics in partition assignor.");
+        log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions);
 
         // first construct the topics to make ready
         final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>();
@@ -744,7 +743,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             internalTopicManager.makeReady(topicsToMakeReady);
         }
 
-        log.debug("Completed validating internal topics in partition assignor.");
+        log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions);
     }
 
     private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 1415ecc..8d623eb 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -202,6 +202,7 @@ class StreamsBrokerBounceTest(Test):
 
         return self.collect_results(sleep_time_secs)
 
+    @ignore
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown"],
             broker_type=["controller"],

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.