You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/06/08 13:48:05 UTC

[kafka] branch 2.8 updated (ba4eed1 -> 2594686)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from ba4eed1  MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout` (#10759) (#10776)
     new 7f64a1c  MINOR: add ConfigUtils method for printing configurations (#10714)
     new 2594686  MINOR: Only log overridden topic configs during topic creation (#10828)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kafka/common/config/ConfigDef.java  |  6 ++++-
 .../org/apache/kafka/common/utils/ConfigUtils.java | 31 ++++++++++++++++++++++
 .../apache/kafka/common/utils/ConfigUtilsTest.java | 28 +++++++++++++++++++
 core/src/main/scala/kafka/log/LogConfig.scala      | 19 +++++++++----
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  5 ++--
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 22 +++++++++++++++
 gradle/spotbugs-exclude.xml                        |  7 +++++
 8 files changed, 111 insertions(+), 9 deletions(-)

[kafka] 01/02: MINOR: add ConfigUtils method for printing configurations (#10714)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7f64a1c8bf25724eb56366695f7c3a645a2e7dcb
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Wed May 19 11:03:49 2021 -0700

    MINOR: add ConfigUtils method for printing configurations (#10714)
    
    Reviewers: Luke Chen <sh...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../org/apache/kafka/common/config/ConfigDef.java  |  6 ++++-
 .../org/apache/kafka/common/utils/ConfigUtils.java | 31 ++++++++++++++++++++++
 .../apache/kafka/common/utils/ConfigUtilsTest.java | 28 +++++++++++++++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  5 ++--
 4 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 156e08f..4fd6954 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -802,7 +802,11 @@ public class ConfigDef {
      * The config types
      */
     public enum Type {
-        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
+        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD;
+
+        public boolean isSensitive() {
+            return this == PASSWORD;
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
index 504a1f0..0f839ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
@@ -17,10 +17,15 @@
 
 package org.apache.kafka.common.utils;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -113,4 +118,30 @@ public class ConfigUtils {
 
         return newConfigs;
     }
+
+    public static String configMapToRedactedString(Map<String, Object> map, ConfigDef configDef) {
+        StringBuilder bld = new StringBuilder("{");
+        List<String> keys = new ArrayList<>(map.keySet());
+        Collections.sort(keys);
+        String prefix = "";
+        for (String key : keys) {
+            bld.append(prefix).append(key).append("=");
+            ConfigKey configKey = configDef.configKeys().get(key);
+            if (configKey == null || configKey.type().isSensitive()) {
+                bld.append("(redacted)");
+            } else {
+                Object value = map.get(key);
+                if (value == null) {
+                    bld.append("null");
+                } else if (configKey.type() == Type.STRING) {
+                    bld.append("\"").append(value).append("\"");
+                } else {
+                    bld.append(value);
+                }
+            }
+            prefix = ", ";
+        }
+        bld.append("}");
+        return bld.toString();
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
index b7279bb..d760330 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
@@ -17,8 +17,12 @@
 
 package org.apache.kafka.common.utils;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -140,4 +144,28 @@ public class ConfigUtilsTest {
         assertNull(newConfig.get("foo.bar.deprecated"));
         assertNull(newConfig.get("foo.bar.even.more.deprecated"));
     }
+
+    private static final ConfigDef CONFIG = new ConfigDef().
+        define("myPassword", Type.PASSWORD, Importance.HIGH, "").
+        define("myString", Type.STRING, Importance.HIGH, "").
+        define("myInt", Type.INT, Importance.HIGH, "").
+        define("myString2", Type.STRING, Importance.HIGH, "");
+
+    @Test
+    public void testConfigMapToRedactedStringForEmptyMap() {
+        assertEquals("{}", ConfigUtils.
+            configMapToRedactedString(Collections.emptyMap(), CONFIG));
+    }
+
+    @Test
+    public void testConfigMapToRedactedStringWithSecrets() {
+        Map<String, Object> testMap1 = new HashMap<>();
+        testMap1.put("myString", "whatever");
+        testMap1.put("myInt", Integer.valueOf(123));
+        testMap1.put("myPassword", "foosecret");
+        testMap1.put("myString2", null);
+        testMap1.put("myUnknown", Integer.valueOf(456));
+        assertEquals("{myInt=123, myPassword=(redacted), myString=\"whatever\", myString2=null, myUnknown=(redacted)}",
+            ConfigUtils.configMapToRedactedString(testMap1, CONFIG));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 91d214f..2cf24c8 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
 import scala.collection._
 import scala.jdk.CollectionConverters._
@@ -601,7 +601,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     }
 
     if (!validateOnly) {
-      info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames custom configs: $newCustomConfigs")
+      info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " +
+           s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}")
       reconfigurable.reconfigure(newConfigs)
     }
   }

[kafka] 02/02: MINOR: Only log overridden topic configs during topic creation (#10828)

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2594686cd60f5eb1900df89922f3844fda8c3c0b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Jun 8 06:39:31 2021 -0700

    MINOR: Only log overridden topic configs during topic creation (#10828)
    
    It's quite verbose to include all configs for every partition loaded/created.
    Also make sure to redact sensitive and unknown config values.
    
    Unit test included.
    
    Reviewers: David Jacot <dj...@confluent.io>, Kowshik Prakasam <kp...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 core/src/main/scala/kafka/log/LogConfig.scala      | 19 ++++++++++++++-----
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 22 ++++++++++++++++++++++
 gradle/spotbugs-exclude.xml                        |  7 +++++++
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c2ab1d8..60e813d 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -17,20 +17,21 @@
 
 package kafka.log
 
-import java.util.{Collections, Locale, Properties}
-
-import scala.jdk.CollectionConverters._
 import kafka.api.{ApiVersion, ApiVersionValidator}
+import kafka.log.LogConfig.configDef
 import kafka.message.BrokerCompressionCodec
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator}
 import org.apache.kafka.common.record.{LegacyRecord, TimestampType}
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
 import scala.collection.{Map, mutable}
-import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator}
+import scala.jdk.CollectionConverters._
+
+import java.util.{Collections, Locale, Properties}
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -108,6 +109,14 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
     if (compact && maxCompactionLagMs > 0) math.min(maxCompactionLagMs, segmentMs)
     else segmentMs
   }
+
+  def overriddenConfigsAsLoggableString: String = {
+    val overriddenTopicProps = props.asScala.collect {
+      case (k: String, v) if overriddenConfigs.contains(k) => (k, v.asInstanceOf[AnyRef])
+    }
+    ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, configDef)
+  }
+
 }
 
 object LogConfig {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 1ca4d7e..38266ed 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -834,7 +834,7 @@ class LogManager(logDirs: Seq[File],
         else
           currentLogs.put(topicPartition, log)
 
-        info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.")
+        info(s"Created log for partition $topicPartition in $logDir with properties ${config.overriddenConfigsAsLoggableString}")
         // Remove the preferred log dir since it has already been satisfied
         preferredLogDirs.remove(topicPartition)
 
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 069547d..19c0b93 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -162,6 +162,28 @@ class LogConfigTest {
     assertNull(nullServerDefault)
   }
 
+  @Test
+  def testOverriddenConfigsAsLoggableString(): Unit = {
+    val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    kafkaProps.put("unknown.broker.password.config", "aaaaa")
+    kafkaProps.put(KafkaConfig.SslKeyPasswordProp, "somekeypassword")
+    kafkaProps.put(KafkaConfig.LogRetentionBytesProp, "50")
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val topicOverrides = new Properties
+    // Only set as a topic config
+    topicOverrides.setProperty(LogConfig.MinInSyncReplicasProp, "2")
+    // Overrides value from broker config
+    topicOverrides.setProperty(LogConfig.RetentionBytesProp, "100")
+    // Unknown topic config, but known broker config
+    topicOverrides.setProperty(KafkaConfig.SslTruststorePasswordProp, "sometrustpasswrd")
+    // Unknown config
+    topicOverrides.setProperty("unknown.topic.password.config", "bbbb")
+    // We don't currently have any sensitive topic configs, if we add them, we should set one here
+    val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(kafkaConfig), topicOverrides)
+    assertEquals("{min.insync.replicas=2, retention.bytes=100, ssl.truststore.password=(redacted), unknown.topic.password.config=(redacted)}",
+      logConfig.overriddenConfigsAsLoggableString)
+  }
+
   private def isValid(configValue: String): Boolean = {
     try {
       ThrottledReplicaListValidator.ensureValidString("", configValue)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index ab60dfd..3b7b5d3 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -159,6 +159,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
+        <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
+        <Source name="LogConfig.scala"/>
+        <Package name="kafka.log"/>
+        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
+    </Match>
+
+    <Match>
         <!-- offsets is a lazy val and it confuses spotBugs with its locking scheme -->
         <Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/>
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>