You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/02/28 07:00:36 UTC

[hive] branch master updated: HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2c38cf71e HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)
1f2c38cf71e is described below

commit 1f2c38cf71e39326a8a56f3d09468e6ecbc6e0cc
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Tue Feb 28 02:00:30 2023 -0500

    HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)
---
 kafka-handler/pom.xml                                               | 2 +-
 .../src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java  | 5 +++--
 .../src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java    | 5 +++--
 .../java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java | 6 +++---
 .../src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java  | 2 +-
 pom.xml                                                             | 2 +-
 ql/pom.xml                                                          | 1 +
 7 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 5a98f4fae76..c57ff71b3fd 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -114,7 +114,7 @@
     <dependency>
       <groupId>io.confluent</groupId>
       <artifactId>kafka-avro-serializer</artifactId>
-      <version>5.4.0</version>
+      <version>5.5.0</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index ffec69d8c1a..cd77f408e8e 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -310,7 +311,7 @@ import java.util.function.Predicate;
 
     RetryUtils.CleanupAfterFailure cleanUpTheMap = new RetryUtils.CleanupAfterFailure() {
       @Override public void cleanup() {
-        producersMap.forEach((s, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+        producersMap.forEach((s, producer) -> producer.close(Duration.ofMillis(0)));
         producersMap.clear();
       }
     };
@@ -346,7 +347,7 @@ import java.util.function.Predicate;
       RetryUtils.retry(commitTask, isRetrayable, maxTries);
     } catch (Exception e) {
       // at this point we are in a funky state if one commit happend!! close and log it
-      producersMap.forEach((key, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+      producersMap.forEach((key, producer) -> producer.close(Duration.ofMillis(0)));
       LOG.error("Commit transaction failed", e);
       if (committedTx.size() > 0) {
         LOG.error("Partial Data Got Commited Some actions need to be Done");
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
index b0db0ad0550..f7ba2f50394 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -120,7 +121,7 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
   @Override public void close(boolean abort) throws IOException {
     if (abort) {
       LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", writerId);
-      producer.close(0, TimeUnit.MICROSECONDS);
+      producer.close(Duration.ofMillis(0));
       return;
     } else {
       LOG.info("Flushing Kafka Producer with writerId [{}]", writerId);
@@ -159,7 +160,7 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
   private void checkExceptions() throws IOException {
     if (sendExceptionRef.get() != null) {
       LOG.error("Send Exception Aborting write from writerId [{}]", writerId);
-      producer.close(0, TimeUnit.MICROSECONDS);
+      producer.close(Duration.ofMillis(0));
       throw new IOException(sendExceptionRef.get());
     }
   }
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
index e4015fcbba9..294cac80971 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -126,7 +126,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
         producer.abortTransaction();
       }
       LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage());
-      producer.close(0, TimeUnit.MILLISECONDS);
+      producer.close(DURATION_0);
       throw exception;
     }
     writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic);
@@ -151,7 +151,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
         // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
         producer.abortTransaction();
       }
-      producer.close(0, TimeUnit.MILLISECONDS);
+      producer.close(DURATION_0);
       sendExceptionRef.compareAndSet(null, e);
       checkExceptions();
     }
@@ -278,7 +278,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
         producer.abortTransaction();
       }
       LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage());
-      producer.close(0, TimeUnit.MILLISECONDS);
+      producer.close(DURATION_0);
       throw new IOException(exception);
     }
   }
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index 287fff4cb26..e2f736619c0 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -74,7 +74,7 @@ class KafkaBrokerResource extends ExternalResource {
     kafkaServer.zkClient();
     adminZkClient = new AdminZkClient(kafkaServer.zkClient());
     LOG.info("Creating kafka TOPIC [{}]", TOPIC);
-    adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+    adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
   }
 
   /**
diff --git a/pom.xml b/pom.xml
index c2a2d9f6f52..363047a1c79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
     <junit.version>4.13.2</junit.version>
     <junit.jupiter.version>5.6.2</junit.jupiter.version>
     <junit.vintage.version>5.6.3</junit.vintage.version>
-    <kafka.version>2.5.0</kafka.version>
+    <kafka.version>3.4.0</kafka.version>
     <kryo.version>5.0.3</kryo.version>
     <reflectasm.version>1.11.9</reflectasm.version>
     <kudu.version>1.12.0</kudu.version>
diff --git a/ql/pom.xml b/ql/pom.xml
index e65636058a5..be7a4bfe818 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -1101,6 +1101,7 @@
                   <include>com.esri.geometry:esri-geometry-api</include>
                   <include>org.roaringbitmap:RoaringBitmap</include>
                   <include>org.roaringbitmap:shims</include>
+                  <include>com.github.luben:zstd-jni</include>
                 </includes>
               </artifactSet>
               <filters>