You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/01/28 18:58:07 UTC

[atlas] branch master updated: ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new ce1342d  ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes
ce1342d is described below

commit ce1342dd6dd7cbc4d1e35bea3b26d08dc24bd67e
Author: Barnabas Maidics <b....@gmail.com>
AuthorDate: Mon Jan 25 14:26:45 2021 +0100

    ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  66 ++++----
 .../patches/018-kafka_topic_add_rf_attribute.json  |  23 +++
 .../java/org/apache/atlas/utils/KafkaUtils.java    | 173 ++++++++++++++-------
 3 files changed, 174 insertions(+), 88 deletions(-)

diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index d22010d..bf74c67 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -54,24 +54,23 @@ import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
 public class KafkaBridge {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
-
-    private static final int    EXIT_CODE_SUCCESS          = 0;
-    private static final int    EXIT_CODE_FAILED           = 1;
-    private static final String ATLAS_ENDPOINT             = "atlas.rest.address";
-    private static final String DEFAULT_ATLAS_URL          = "http://localhost:21000/";
-    private static final String CLUSTER_NAME_KEY           = "atlas.cluster.name";
-    private static final String KAFKA_METADATA_NAMESPACE   = "atlas.metadata.namespace";
-    private static final String DEFAULT_CLUSTER_NAME       = "primary";
-    private static final String ATTRIBUTE_QUALIFIED_NAME   = "qualifiedName";
-    private static final String DESCRIPTION_ATTR           = "description";
-    private static final String PARTITION_COUNT            = "partitionCount";
-    private static final String NAME                       = "name";
-    private static final String URI                        = "uri";
-    private static final String CLUSTERNAME                = "clusterName";
-    private static final String TOPIC                      = "topic";
-
-    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME       = "%s@%s";
+    private static final Logger LOG                               = LoggerFactory.getLogger(KafkaBridge.class);
+    private static final int    EXIT_CODE_SUCCESS                 = 0;
+    private static final int    EXIT_CODE_FAILED                  = 1;
+    private static final String ATLAS_ENDPOINT                    = "atlas.rest.address";
+    private static final String DEFAULT_ATLAS_URL                 = "http://localhost:21000/";
+    private static final String CLUSTER_NAME_KEY                  = "atlas.cluster.name";
+    private static final String KAFKA_METADATA_NAMESPACE          = "atlas.metadata.namespace";
+    private static final String DEFAULT_CLUSTER_NAME              = "primary";
+    private static final String ATTRIBUTE_QUALIFIED_NAME          = "qualifiedName";
+    private static final String DESCRIPTION_ATTR                  = "description";
+    private static final String PARTITION_COUNT                   = "partitionCount";
+    private static final String REPLICATION_FACTOR                = "replicationFactor";
+    private static final String NAME                              = "name";
+    private static final String URI                               = "uri";
+    private static final String CLUSTERNAME                       = "clusterName";
+    private static final String TOPIC                             = "topic";
+    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
 
     private final List<String>  availableTopics;
     private final String        metadataNamespace;
@@ -80,9 +79,9 @@ public class KafkaBridge {
 
 
     public static void main(String[] args) {
-        int exitCode = EXIT_CODE_FAILED;
+        int           exitCode      = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
-        KafkaUtils kafkaUtils = null;
+        KafkaUtils    kafkaUtils    = null;
 
         try {
             Options options = new Options();
@@ -114,14 +113,15 @@ public class KafkaBridge {
             kafkaUtils = new KafkaUtils(atlasConf);
 
             KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
+
             if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
 
                 if (f.exists() && f.canRead()) {
                     BufferedReader br   = new BufferedReader(new FileReader(f));
-                    String         line = null;
+                    String         line;
 
-                    while((line = br.readLine()) != null) {
+                    while ((line = br.readLine()) != null) {
                         topicToImport = line.trim();
 
                         importer.importTopic(topicToImport);
@@ -138,15 +138,19 @@ public class KafkaBridge {
             }
         } catch(ParseException e) {
             LOG.error("Failed to parse arguments. Error: ", e.getMessage());
+
             printUsage();
         } catch(Exception e) {
             System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
+
             e.printStackTrace();
+
             LOG.error("ImportKafkaEntities failed", e);
         } finally {
             if (atlasClientV2 != null) {
                 atlasClientV2.close();
             }
+
             if (kafkaUtils != null) {
                 kafkaUtils.close();
             }
@@ -175,16 +179,18 @@ public class KafkaBridge {
 
         if (StringUtils.isNotEmpty(topicToImport)) {
             List<String> topics_subset = new ArrayList<>();
-            for(String topic : topics) {
+
+            for (String topic : topics) {
                 if (Pattern.compile(topicToImport).matcher(topic).matches()) {
                     topics_subset.add(topic);
                 }
             }
+
             topics = topics_subset;
         }
 
         if (CollectionUtils.isNotEmpty(topics)) {
-            for(String topic : topics) {
+            for (String topic : topics) {
                 createOrUpdateTopic(topic);
             }
         }
@@ -234,11 +240,14 @@ public class KafkaBridge {
         ret.setAttribute(NAME,topic);
         ret.setAttribute(DESCRIPTION_ATTR, topic);
         ret.setAttribute(URI, topic);
+
         try {
             ret.setAttribute(PARTITION_COUNT, kafkaUtils.getPartitionCount(topic));
+            ret.setAttribute(REPLICATION_FACTOR, kafkaUtils.getReplicationFactor(topic));
         } catch (ExecutionException | InterruptedException e) {
-            LOG.error("Error while getting partition count for topic :" + topic, e);
-            throw new Exception("Error while getting partition count for topic :" + topic, e);
+            LOG.error("Error while getting partition data for topic :" + topic, e);
+
+            throw new Exception("Error while getting partition data for topic :" + topic, e);
         }
 
         return ret;
@@ -254,6 +263,7 @@ public class KafkaBridge {
 
         try {
             ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
+
             clearRelationshipAttributes(ret);
         } catch (Exception e) {
             ret = null; // entity doesn't exist in Atlas
@@ -288,7 +298,7 @@ public class KafkaBridge {
 
     @VisibleForTesting
     AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
-        AtlasEntityWithExtInfo ret      = null;
+        AtlasEntityWithExtInfo ret;
         EntityMutationResponse response = atlasClientV2.updateEntity(entity);
 
         if (response != null) {
@@ -348,4 +358,4 @@ public class KafkaBridge {
             entity.getRelationshipAttributes().clear();
         }
     }
-}
+}
\ No newline at end of file
diff --git a/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json
new file mode 100644
index 0000000..cc74bad
--- /dev/null
+++ b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json
@@ -0,0 +1,23 @@
+{
+  "patches": [
+    {
+      "id": "TYPEDEF_PATCH_1000_018_001",
+      "description": "Add 'replicationFactor' attribute to kafka_topic",
+      "action": "ADD_ATTRIBUTE",
+      "typeName": "kafka_topic",
+      "applyToVersion": "1.7",
+      "updateToVersion": "1.8",
+      "params": null,
+      "attributeDefs": [
+        {
+          "name": "replicationFactor",
+          "typeName": "int",
+          "cardinality": "SINGLE",
+          "isIndexable": true,
+          "isOptional": true,
+          "isUnique": false
+        }
+      ]
+    }
+  ]
+}
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 14e205a..eea3311 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -48,37 +48,37 @@ public class KafkaUtils implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
 
-    static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
-    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
-    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
-    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
-    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
+    private static final String JAAS_CONFIG_PREFIX_PARAM                     = "atlas.jaas";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM          = "loginModuleName";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM  = "loginModuleControlFlag";
+    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG       = "required";
     private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
-    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
-    private static final String JAAS_PRINCIPAL_PROP = "principal";
-    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
-    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
-    private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import";
+    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX             = "option";
+    private static final String JAAS_PRINCIPAL_PROP                          = "principal";
+    private static final String JAAS_DEFAULT_CLIENT_NAME                     = "KafkaClient";
+    private static final String JAAS_TICKET_BASED_CLIENT_NAME                = "ticketBased-KafkaClient";
+    private static final String IMPORT_INTERNAL_TOPICS                       = "atlas.kafka.bridge.enable.internal.topics.import";
 
-    public static final String ATLAS_KAFKA_PROPERTY_PREFIX   = "atlas.kafka";
-
-    final protected Properties kafkaConfiguration;
+    public static final String ATLAS_KAFKA_PROPERTY_PREFIX     = "atlas.kafka";
+    public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
 
+    final protected Properties  kafkaConfiguration;
     final protected AdminClient adminClient;
-
-    final protected boolean importInternalTopics;
+    final protected boolean     importInternalTopics;
 
     public KafkaUtils(Configuration atlasConfiguration) {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils() ");
         }
+
         this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(atlasConfiguration, ATLAS_KAFKA_PROPERTY_PREFIX);
 
         setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
-        adminClient = AdminClient.create(this.kafkaConfiguration);
+
+        adminClient          = AdminClient.create(this.kafkaConfiguration);
         importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("<== KafkaUtils() ");
         }
     }
@@ -86,100 +86,143 @@ public class KafkaUtils implements AutoCloseable {
     public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor)
             throws TopicExistsException, ExecutionException, InterruptedException {
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> createTopics() ");
         }
 
         List<NewTopic> newTopicList = topicNames.stream()
-                .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
-                .collect(Collectors.toList());
+                                                .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
+                                                .collect(Collectors.toList());
+
+        CreateTopicsResult             createTopicsResult = adminClient.createTopics(newTopicList);
+        Map<String, KafkaFuture<Void>> futureMap          = createTopicsResult.values();
 
-        CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
-        Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
-        for(Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
-            String topicName = futureEntry.getKey();
+        for (Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
             KafkaFuture<Void> future = futureEntry.getValue();
+
             future.get();
         }
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("<== createTopics() ");
         }
     }
 
     public List<String> listAllTopics() throws ExecutionException, InterruptedException {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils.listAllTopics() ");
         }
+
         ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
-        List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
+        List<String>     ret              = new ArrayList<>(listTopicsResult.names().get());
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("<== KafkaUtils.listAllTopics() ");
         }
 
-        return topicNameList;
+        return ret;
     }
 
     public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName);
         }
 
-        Integer partitionCount = null;
-        DescribeTopicsResult describeTopicsResult =  adminClient.describeTopics(Collections.singleton(topicName));
-        Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
-        for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
-            KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
-            TopicDescription topicDescription = topicDescriptionFuture.get();
-            List<TopicPartitionInfo> partitionList = topicDescription.partitions();
-            partitionCount = partitionList.size();
+        Integer                  ret            = null;
+        List<TopicPartitionInfo> partitionList  = getPartitionList(topicName);
+
+        if (partitionList != null) {
+            ret = partitionList.size();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, ret);
         }
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, partitionCount);
+        return ret;
+    }
+
+    public Integer getReplicationFactor(String topicName) throws ExecutionException, InterruptedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils.getReplicationFactor({})", topicName);
+        }
+
+        Integer                  ret           = null;
+        List<TopicPartitionInfo> partitionList = getPartitionList(topicName);
+
+        if (partitionList != null) {
+            ret = partitionList.stream().mapToInt(x -> x.replicas().size()).max().getAsInt();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.getReplicationFactor returning for topic {} with replicationFactor {}", topicName, ret);
+        }
+
+        return ret;
+    }
+
+    private List<TopicPartitionInfo> getPartitionList(String topicName) throws ExecutionException, InterruptedException {
+        List<TopicPartitionInfo> ret                  = null;
+        DescribeTopicsResult     describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
+
+        if (describeTopicsResult != null) {
+            Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
+
+            for (Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
+                KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
+                TopicDescription              topicDescription       = topicDescriptionFuture.get();
+
+                ret = topicDescription.partitions();
+            }
         }
 
-        return partitionCount;
+        return ret;
     }
 
     public void close() {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils.close()");
         }
-        if(adminClient != null) {
+        if (adminClient != null) {
             adminClient.close();
         }
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("<== KafkaUtils.close()");
         }
     }
 
     public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
         }
 
-        if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+        if (kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
             LOG.debug("JAAS config is already set, returning");
+
             return;
         }
 
         Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
         // JAAS Configuration is present then update set those properties in sasl.jaas.config
-        if(jaasConfig != null && !jaasConfig.isEmpty()) {
+
+        if (jaasConfig != null && !jaasConfig.isEmpty()) {
             String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
 
             // Required for backward compatability for Hive CLI
             if (!isLoginKeytabBased() && isLoginTicketBased()) {
-                LOG.debug("Checking if ticketBased-KafkaClient is set");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Checking if ticketBased-KafkaClient is set");
+                }
+
                 // if ticketBased-KafkaClient property is not specified then use the default client name
                 String        ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
                 Configuration ticketBasedConfig       = configuration.subset(ticketBasedConfigPrefix);
 
-                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
-                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+                if (ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+                    }
 
                     jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
                 } else {
@@ -193,24 +236,31 @@ public class KafkaUtils implements AutoCloseable {
 
             if (loginModuleName == null) {
                 LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
+
                 return;
             }
 
             keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+
             String controlFlag = jaasConfig.getProperty(keyParam);
 
-            if(StringUtils.isEmpty(controlFlag)) {
+            if (StringUtils.isEmpty(controlFlag)) {
                 String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+
                 controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+
                 LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
             }
-            String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
-            String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
-            int optionPrefixLen = optionPrefix.length();
+
+            String       optionPrefix       = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
+            String       principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+            int          optionPrefixLen    = optionPrefix.length();
             StringBuffer optionStringBuffer = new StringBuffer();
+
             for (String key : jaasConfig.stringPropertyNames()) {
                 if (key.startsWith(optionPrefix)) {
                     String optionVal = jaasConfig.getProperty(key);
+
                     if (optionVal != null) {
                         optionVal = optionVal.trim();
 
@@ -223,16 +273,18 @@ public class KafkaUtils implements AutoCloseable {
                         }
 
                         optionVal = surroundWithQuotes(optionVal);
+
                         optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
                     }
                 }
             }
 
             String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+
             kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
         }
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
         }
     }
@@ -262,17 +314,19 @@ public class KafkaUtils implements AutoCloseable {
     }
 
     static String surroundWithQuotes(String optionVal) {
-        if(StringUtils.isEmpty(optionVal)) {
+        if (StringUtils.isEmpty(optionVal)) {
             return optionVal;
         }
+
         String ret = optionVal;
 
         // For property values which have special chars like "@" or "/", we need to enclose it in
         // double quotes, so that Kafka can parse it
         // If the property is already enclosed in double quotes, then do nothing.
-        if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
+        if (optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
             // If the string as special characters like except _,-
             final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+
             if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
                 ret = String.format("\"%s\"", optionVal);
             }
@@ -280,5 +334,4 @@ public class KafkaUtils implements AutoCloseable {
 
         return ret;
     }
-
-}
+}
\ No newline at end of file