You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by de...@apache.org on 2022/08/05 02:42:17 UTC

[hive] branch master updated: HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)

This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aaba3c79e HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by  Laszlo Bodor, Zhihua Deng)
2aaba3c79e is described below

commit 2aaba3c79e740ef27fc263b5a8ff33ad679c5a12
Author: Yu-Wen <hs...@users.noreply.github.com>
AuthorDate: Fri Aug 5 10:42:08 2022 +0800

    HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by  Laszlo Bodor, Zhihua Deng)
---
 .../apache/hadoop/hive/ql/exec/tez/DagUtils.java   | 24 +++++++++++-----------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index d8ac63acea..eb4eef03bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -312,20 +312,13 @@ public class DagUtils {
 
     Map<String, PartitionDesc> partitions = work.getAliasToPartnInfo();
 
-    for (PartitionDesc partition : partitions.values()) {
+    // We don't need to iterate on all partitions, and check the same TableDesc.
+    PartitionDesc partition = partitions.values().stream().findFirst().orElse(null);
+    if (partition != null) {
       TableDesc tableDesc = partition.getTableDesc();
-      boolean tokenCollected = collectKafkaDelegationTokenForTableDesc(dag, conf, tableDesc);
-      if (tokenCollected) {
+      if (collectKafkaDelegationTokenForTableDesc(dag, conf, tableDesc)) {
         // don't collect delegation token again, if it was already successful
         return;
-      } else {
-        /*
-         * We don't need to iterate on all partitions, and check the same TableDesc:
-         * if partitions[0].getTableDesc() doesn't show a kafka table, let's break the loop quickly.
-         * Note: at this point we cannot return from this method, as fileSinkTableDescs should
-         * be checked too.
-         */
-        break;
       }
     }
 
@@ -339,6 +332,7 @@ public class DagUtils {
 
   /**
    * Tries to collect delegation tokens for kafka in the scope of a TableDesc.
+   * If "security.protocol" is set to "PLAINTEXT", we don't need to collect delegation token at all.
    * @param dag
    * @param conf
    * @param tableDesc
@@ -346,7 +340,13 @@ public class DagUtils {
    */
   private boolean collectKafkaDelegationTokenForTableDesc(DAG dag, JobConf conf, TableDesc tableDesc) {
     String kafkaBrokers = (String) tableDesc.getProperties().get("kafka.bootstrap.servers"); //FIXME: KafkaTableProperties
-    if (kafkaBrokers != null && !kafkaBrokers.isEmpty()) {
+    String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
+        "kafka.consumer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+    String producerSecurityProtocol = (String) tableDesc.getProperties().get(
+        "kafka.producer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+    if (kafkaBrokers != null && !kafkaBrokers.isEmpty() &&
+        !CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol) &&
+        !CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol)) {
       getKafkaDelegationTokenForBrokers(dag, conf, kafkaBrokers);
       return true;
     }