You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by de...@apache.org on 2022/08/05 02:42:17 UTC
[hive] branch master updated: HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)
This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2aaba3c79e HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)
2aaba3c79e is described below
commit 2aaba3c79e740ef27fc263b5a8ff33ad679c5a12
Author: Yu-Wen <hs...@users.noreply.github.com>
AuthorDate: Fri Aug 5 10:42:08 2022 +0800
HIVE-26439: Skip collecting Kafka delegation tokens if PLAINTEXT is set (#3488) (Yu-Wen Lai, reviewed by Laszlo Bodor, Zhihua Deng)
---
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index d8ac63acea..eb4eef03bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -312,20 +312,13 @@ public class DagUtils {
Map<String, PartitionDesc> partitions = work.getAliasToPartnInfo();
- for (PartitionDesc partition : partitions.values()) {
+ // We don't need to iterate on all partitions, and check the same TableDesc.
+ PartitionDesc partition = partitions.values().stream().findFirst().orElse(null);
+ if (partition != null) {
TableDesc tableDesc = partition.getTableDesc();
- boolean tokenCollected = collectKafkaDelegationTokenForTableDesc(dag, conf, tableDesc);
- if (tokenCollected) {
+ if (collectKafkaDelegationTokenForTableDesc(dag, conf, tableDesc)) {
// don't collect delegation token again, if it was already successful
return;
- } else {
- /*
- * We don't need to iterate on all partitions, and check the same TableDesc:
- * if partitions[0].getTableDesc() doesn't show a kafka table, let's break the loop quickly.
- * Note: at this point we cannot return from this method, as fileSinkTableDescs should
- * be checked too.
- */
- break;
}
}
@@ -339,6 +332,7 @@ public class DagUtils {
/**
* Tries to collect delegation tokens for kafka in the scope of a TableDesc.
+ * If "security.protocol" is set to "PLAINTEXT", we don't need to collect delegation token at all.
* @param dag
* @param conf
* @param tableDesc
@@ -346,7 +340,13 @@ public class DagUtils {
*/
private boolean collectKafkaDelegationTokenForTableDesc(DAG dag, JobConf conf, TableDesc tableDesc) {
String kafkaBrokers = (String) tableDesc.getProperties().get("kafka.bootstrap.servers"); //FIXME: KafkaTableProperties
- if (kafkaBrokers != null && !kafkaBrokers.isEmpty()) {
+ String consumerSecurityProtocol = (String) tableDesc.getProperties().get(
+ "kafka.consumer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+ String producerSecurityProtocol = (String) tableDesc.getProperties().get(
+ "kafka.producer." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+ if (kafkaBrokers != null && !kafkaBrokers.isEmpty() &&
+ !CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(consumerSecurityProtocol) &&
+ !CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL.equalsIgnoreCase(producerSecurityProtocol)) {
getKafkaDelegationTokenForBrokers(dag, conf, kafkaBrokers);
return true;
}