You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2021/02/20 03:45:27 UTC

[kafka] branch trunk updated: MINOR: apply Utils.isBlank to code base (#10124)

This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 954c090  MINOR: apply Utils.isBlank to code base (#10124)
954c090 is described below

commit 954c090ffc378a63ce3c3c9a72b87724fcd2cd6c
Author: CHUN-HAO TANG <ta...@gmail.com>
AuthorDate: Sat Feb 20 11:44:29 2021 +0800

    MINOR: apply Utils.isBlank to code base (#10124)
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>
---
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java    | 2 +-
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java    | 4 ++--
 .../oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java | 7 ++++---
 .../unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java      | 9 +++------
 .../unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java  | 3 +--
 .../internals/unsecured/OAuthBearerScopeUtilsTest.java           | 3 ++-
 .../api/src/main/java/org/apache/kafka/connect/data/Values.java  | 4 ++--
 .../main/java/org/apache/kafka/connect/health/AbstractState.java | 6 ++++--
 .../java/org/apache/kafka/connect/health/ConnectorHealth.java    | 5 +++--
 .../src/test/java/org/apache/kafka/connect/data/ValuesTest.java  | 3 ++-
 .../rest/basic/auth/extension/PropertyFileLoginModule.java       | 3 ++-
 .../java/org/apache/kafka/connect/runtime/ConnectorConfig.java   | 2 +-
 .../org/apache/kafka/connect/runtime/SinkConnectorConfig.java    | 7 ++++---
 .../main/java/org/apache/kafka/connect/runtime/WorkerConfig.java | 5 +++--
 .../kafka/connect/runtime/distributed/DistributedHerder.java     | 2 +-
 .../java/org/apache/kafka/connect/runtime/isolation/Plugins.java | 3 +--
 .../java/org/apache/kafka/connect/runtime/rest/RestServer.java   | 7 ++++---
 .../org/apache/kafka/connect/transforms/TimestampConverter.java  | 4 ++--
 core/src/main/scala/kafka/network/SocketServer.scala             | 4 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala               | 4 ++--
 core/src/main/scala/kafka/utils/Log4jController.scala            | 5 +++--
 .../src/main/java/org/apache/kafka/streams/kstream/Printed.java  | 3 ++-
 .../src/main/java/org/apache/kafka/streams/state/HostInfo.java   | 3 ++-
 23 files changed, 53 insertions(+), 45 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3296fb8..60c8d63 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4540,7 +4540,7 @@ public class KafkaAdminClient extends AdminClient {
         final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
         for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
             final String feature = entry.getKey();
-            if (feature.trim().isEmpty()) {
+            if (Utils.isBlank(feature)) {
                 throw new IllegalArgumentException("Provided feature can not be empty.");
             }
             updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e60eebe..f7760bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -957,7 +957,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 this.unsubscribe();
             } else {
                 for (String topic : topics) {
-                    if (topic == null || topic.trim().isEmpty())
+                    if (Utils.isBlank(topic))
                         throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                 }
 
@@ -1108,7 +1108,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             } else {
                 for (TopicPartition tp : partitions) {
                     String topic = (tp != null) ? tp.topic() : null;
-                    if (topic == null || topic.trim().isEmpty())
+                    if (Utils.isBlank(topic))
                         throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                 }
                 fetcher.clearBufferedDataForUnassignedPartitions(partitions);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index b5b3016..fa175b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.utils.Utils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -103,7 +104,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
                     OAuthBearerValidationResult.newFailure("No expiration time in JWT"));
         lifetime = convertClaimTimeInSecondsToMs(expirationTimeSeconds);
         String principalName = claim(this.principalClaimName, String.class);
-        if (principalName == null || principalName.trim().isEmpty())
+        if (Utils.isBlank(principalName))
             throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult
                     .newFailure("No principal name in JWT claim: " + this.principalClaimName));
         this.principalName = principalName;
@@ -345,7 +346,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
         String scopeClaimName = scopeClaimName();
         if (isClaimType(scopeClaimName, String.class)) {
             String scopeClaimValue = claim(scopeClaimName, String.class);
-            if (scopeClaimValue.trim().isEmpty())
+            if (Utils.isBlank(scopeClaimValue))
                 return Collections.emptySet();
             else {
                 Set<String> retval = new HashSet<>();
@@ -360,7 +361,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
         List<String> stringList = (List<String>) scopeClaimValue;
         Set<String> retval = new HashSet<>();
         for (String scope : stringList) {
-            if (scope != null && !scope.trim().isEmpty()) {
+            if (!Utils.isBlank(scope)) {
                 retval.add(scope.trim());
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index e7a4f2c..eb4c7db 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
 import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,13 +192,9 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
             throw new OAuthBearerConfigException("Extensions provided in login context without a token");
         }
         String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
-        String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
-                ? principalClaimNameValue.trim()
-                : DEFAULT_PRINCIPAL_CLAIM_NAME;
+        String principalClaimName = Utils.isBlank(principalClaimNameValue) ? DEFAULT_PRINCIPAL_CLAIM_NAME : principalClaimNameValue.trim();
         String scopeClaimNameValue = optionValue(SCOPE_CLAIM_NAME_OPTION);
-        String scopeClaimName = scopeClaimNameValue != null && !scopeClaimNameValue.trim().isEmpty()
-                ? scopeClaimNameValue.trim()
-                : DEFAULT_SCOPE_CLAIM_NAME;
+        String scopeClaimName = Utils.isBlank(scopeClaimNameValue) ? DEFAULT_SCOPE_CLAIM_NAME : scopeClaimNameValue.trim();
         String headerJson = "{" + claimOrHeaderJsonText("alg", "none") + "}";
         String lifetimeSecondsValueToUse = optionValue(LIFETIME_SECONDS_OPTION, DEFAULT_LIFETIME_SECONDS_ONE_HOUR);
         String claimsJson;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index a9dc059..7a81521 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -195,8 +195,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
         String allowableClockSkewMsValue = option(ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION);
         int allowableClockSkewMs = 0;
         try {
-            allowableClockSkewMs = allowableClockSkewMsValue == null || allowableClockSkewMsValue.trim().isEmpty() ? 0
-                    : Integer.parseInt(allowableClockSkewMsValue.trim());
+            allowableClockSkewMs = Utils.isBlank(allowableClockSkewMsValue) ? 0 : Integer.parseInt(allowableClockSkewMsValue.trim());
         } catch (NumberFormatException e) {
             throw new OAuthBearerConfigException(e.getMessage(), e);
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerScopeUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerScopeUtilsTest.java
index 6c1496b..f65440e 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerScopeUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerScopeUtilsTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.List;
 
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 public class OAuthBearerScopeUtilsTest {
@@ -28,7 +29,7 @@ public class OAuthBearerScopeUtilsTest {
     public void validScope() {
         for (String validScope : new String[] {"", "   ", "scope1", " scope1 ", "scope1 Scope2", "scope1   Scope2"}) {
             List<String> parsedScope = OAuthBearerScopeUtils.parseScope(validScope);
-            if (validScope.trim().isEmpty()) {
+            if (Utils.isBlank(validScope)) {
                 assertTrue(parsedScope.isEmpty());
             } else if (validScope.contains("Scope2")) {
                 assertTrue(parsedScope.size() == 2 && parsedScope.get(0).equals("scope1")
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 067c91b..31f4183 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -891,7 +891,7 @@ public class Values {
         }
 
         String token = parser.next();
-        if (token.trim().isEmpty()) {
+        if (Utils.isBlank(token)) {
             return new SchemaAndValue(Schema.STRING_SCHEMA, token);
         }
         token = token.trim();
@@ -1253,7 +1253,7 @@ public class Values {
                 nextToken = consumeNextToken();
             }
             if (ignoreLeadingAndTrailingWhitespace) {
-                while (nextToken.trim().isEmpty() && canConsumeNextToken()) {
+                while (Utils.isBlank(nextToken) && canConsumeNextToken()) {
                     nextToken = consumeNextToken();
                 }
             }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
index f707b3c..ff65715 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
@@ -19,6 +19,8 @@ package org.apache.kafka.connect.health;
 
 import java.util.Objects;
 
+import org.apache.kafka.common.utils.Utils;
+
 /**
  * Provides the current status along with identifier for Connect worker and tasks.
  */
@@ -36,10 +38,10 @@ public abstract class AbstractState {
      * @param traceMessage  any error trace message associated with the connector or the task; may be null or empty
      */
     public AbstractState(String state, String workerId, String traceMessage) {
-        if (state == null || state.trim().isEmpty()) {
+        if (Utils.isBlank(state)) {
             throw new IllegalArgumentException("State must not be null or empty");
         }
-        if (workerId == null || workerId.trim().isEmpty()) {
+        if (Utils.isBlank(workerId)) {
             throw new IllegalArgumentException("Worker ID must not be null or empty");
         }
         this.state = state;
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
index 12fa6b7..1f78157 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.connect.health;
 
-
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.kafka.common.utils.Utils;
+
 /**
  * Provides basic health information about the connector and its tasks.
  */
@@ -35,7 +36,7 @@ public class ConnectorHealth {
                            ConnectorState connectorState,
                            Map<Integer, TaskState> tasks,
                            ConnectorType type) {
-        if (name == null || name.trim().isEmpty()) {
+        if (Utils.isBlank(name)) {
             throw new IllegalArgumentException("Connector name is required");
         }
         Objects.requireNonNull(connectorState, "connectorState can't be null");
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 01fbae7..3700a6e 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.data;
 
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.Values.Parser;
 import org.apache.kafka.connect.errors.DataException;
@@ -911,7 +912,7 @@ public class ValuesTest {
 
     protected void assertConsumable(Parser parser, String... expectedTokens) {
         for (String expectedToken : expectedTokens) {
-            if (!expectedToken.trim().isEmpty()) {
+            if (!Utils.isBlank(expectedToken)) {
                 int position = parser.mark();
                 assertTrue(parser.canConsume(expectedToken.trim()));
                 parser.rewindTo(position);
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
index 8a26dc3..8b8e324 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,7 @@ public class PropertyFileLoginModule implements LoginModule {
     public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
         this.callbackHandler = callbackHandler;
         fileName = (String) options.get(FILE_OPTIONS);
-        if (fileName == null || fileName.trim().isEmpty()) {
+        if (Utils.isBlank(fileName)) {
             throw new ConfigException("Property Credentials file must be specified");
         }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index ca9d33c..4ba1ddd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -494,7 +494,7 @@ public class ConnectorConfig extends AbstractConfig {
                         .filter(c -> Modifier.isPublic(c.getModifiers()))
                         .map(Class::getName)
                         .collect(Collectors.joining(", "));
-                String message = childClassNames.trim().isEmpty() ?
+                String message = Utils.isBlank(childClassNames) ?
                         aliasKind + " is abstract and cannot be created." :
                         aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
                 throw new ConfigException(key, String.valueOf(cls), message);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 415d46f..93c2cb4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -125,17 +126,17 @@ public class SinkConnectorConfig extends ConnectorConfig {
 
     public static boolean hasTopicsConfig(Map<String, String> props) {
         String topicsStr = props.get(TOPICS_CONFIG);
-        return topicsStr != null && !topicsStr.trim().isEmpty();
+        return !Utils.isBlank(topicsStr);
     }
 
     public static boolean hasTopicsRegexConfig(Map<String, String> props) {
         String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
-        return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
+        return !Utils.isBlank(topicsRegexStr);
     }
 
     public static boolean hasDlqTopicConfig(Map<String, String> props) {
         String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
-        return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty();
+        return !Utils.isBlank(dqlTopicStr);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 58a9ce3..e140e59 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.storage.Converter;
@@ -516,7 +517,7 @@ public class WorkerConfig extends AbstractConfig {
                 if (!(item instanceof String)) {
                     throw new ConfigException("Invalid type for admin listener (expected String).");
                 }
-                if (((String) item).trim().isEmpty()) {
+                if (Utils.isBlank((String) item)) {
                     throw new ConfigException("Empty listener found when parsing list.");
                 }
             }
@@ -527,7 +528,7 @@ public class WorkerConfig extends AbstractConfig {
         @Override
         public void ensureValid(String name, Object value) {
             String strValue = (String) value;
-            if (strValue == null || strValue.trim().isEmpty()) {
+            if (Utils.isBlank(strValue)) {
                 return;
             }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 16dfbf9..b4dfb4d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1447,7 +1447,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                     forwardRequestExecutor.submit(() -> {
                         try {
                             String leaderUrl = leaderUrl();
-                            if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
+                            if (Utils.isBlank(leaderUrl)) {
                                 cb.onCompletion(new ConnectException("Request to leader to " +
                                         "reconfigure connector tasks failed " +
                                         "because the URL of the leader's REST interface is empty!"), null);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index d507059..6ab8a76 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -445,8 +445,7 @@ public class Plugins {
             plugin = newPlugin(klass);
             if (plugin instanceof Versioned) {
                 Versioned versionedPlugin = (Versioned) plugin;
-                if (versionedPlugin.version() == null || versionedPlugin.version().trim()
-                    .isEmpty()) {
+                if (Utils.isBlank(versionedPlugin.version())) {
                     throw new ConnectException("Version not defined for '" + klassName + "'");
                 }
             }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 136c616..8f371bb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.health.ConnectClusterDetails;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
@@ -275,19 +276,19 @@ public class RestServer {
         }
 
         String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
-        if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
+        if (!Utils.isBlank(allowedOrigins)) {
             FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
             filterHolder.setName("cross-origin");
             filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
             String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
-            if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
+            if (!Utils.isBlank(allowedMethods)) {
                 filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
             }
             context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
         }
 
         String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
-        if (headerConfig != null && !headerConfig.trim().isEmpty()) {
+        if (!Utils.isBlank(headerConfig)) {
             configureHttpResponsHeaderFilter(context);
         }
 
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index b92dc84..a8d5cec 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -255,11 +255,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
             throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are "
                     + Utils.join(VALID_TYPES, ", ") + ".");
         }
-        if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) {
+        if (type.equals(TYPE_STRING) && Utils.isBlank(formatPattern)) {
             throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps");
         }
         SimpleDateFormat format = null;
-        if (formatPattern != null && !formatPattern.trim().isEmpty()) {
+        if (!Utils.isBlank(formatPattern)) {
             try {
                 format = new SimpleDateFormat(formatPattern);
                 format.setTimeZone(UTC);
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 24df39f..a4a990c 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientI
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
+import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
 import org.slf4j.event.Level
 
@@ -650,7 +650,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
    */
   private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
-      if (host == null || host.trim.isEmpty)
+      if (Utils.isBlank(host))
         new InetSocketAddress(port)
       else
         new InetSocketAddress(host, port)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4daee08..90c57f2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
-import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, Node}
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer.Authorizer
@@ -477,7 +477,7 @@ class KafkaServer(
     }
 
     val updatedEndpoints = listeners.map(endpoint =>
-      if (endpoint.host == null || endpoint.host.trim.isEmpty)
+      if (Utils.isBlank(endpoint.host))
         endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
       else
         endpoint
diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala
index a02fdb6..0d54c74 100755
--- a/core/src/main/scala/kafka/utils/Log4jController.scala
+++ b/core/src/main/scala/kafka/utils/Log4jController.scala
@@ -20,6 +20,7 @@ package kafka.utils
 import java.util
 import java.util.Locale
 
+import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.{Level, LogManager, Logger}
 
 import scala.collection.mutable
@@ -71,7 +72,7 @@ object Log4jController {
     */
   def logLevel(loggerName: String, logLevel: String): Boolean = {
     val log = existingLogger(loggerName)
-    if (!loggerName.trim.isEmpty && !logLevel.trim.isEmpty && log != null) {
+    if (!Utils.isBlank(loggerName) && !Utils.isBlank(logLevel) && log != null) {
       log.setLevel(Level.toLevel(logLevel.toUpperCase(Locale.ROOT)))
       true
     }
@@ -80,7 +81,7 @@ object Log4jController {
 
   def unsetLogLevel(loggerName: String): Boolean = {
     val log = existingLogger(loggerName)
-    if (!loggerName.trim.isEmpty && log != null) {
+    if (!Utils.isBlank(loggerName) && log != null) {
       log.setLevel(null)
       true
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index fdcd9cb..6a3d1e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
 
 import java.io.IOException;
@@ -63,7 +64,7 @@ public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
      */
     public static <K, V> Printed<K, V> toFile(final String filePath) {
         Objects.requireNonNull(filePath, "filePath can't be null");
-        if (filePath.trim().isEmpty()) {
+        if (Utils.isBlank(filePath)) {
             throw new TopologyException("filePath can't be an empty string");
         }
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 6293cf5..70bdcda 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -21,6 +21,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
@@ -55,7 +56,7 @@ public class HostInfo {
      * @return a new HostInfo or null if endPoint is null or has no characters
      */
     public static HostInfo buildFromEndpoint(final String endPoint) {
-        if (endPoint == null || endPoint.trim().isEmpty()) {
+        if (Utils.isBlank(endPoint)) {
             return null;
         }