You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/02/28 07:00:36 UTC
[hive] branch master updated: HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2c38cf71e HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)
1f2c38cf71e is described below
commit 1f2c38cf71e39326a8a56f3d09468e6ecbc6e0cc
Author: Dmitriy Fingerman <dm...@gmail.com>
AuthorDate: Tue Feb 28 02:00:30 2023 -0500
HIVE-27105: Querying parquet table with zstd encryption is failing. (#4082). (Dmitriy Fingerman, reviewed by Ayush Saxena)
---
kafka-handler/pom.xml | 2 +-
.../src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java | 5 +++--
.../src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java | 5 +++--
.../java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java | 6 +++---
.../src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java | 2 +-
pom.xml | 2 +-
ql/pom.xml | 1 +
7 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 5a98f4fae76..c57ff71b3fd 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -114,7 +114,7 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
- <version>5.4.0</version>
+ <version>5.5.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index ffec69d8c1a..cd77f408e8e 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -310,7 +311,7 @@ import java.util.function.Predicate;
RetryUtils.CleanupAfterFailure cleanUpTheMap = new RetryUtils.CleanupAfterFailure() {
@Override public void cleanup() {
- producersMap.forEach((s, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+ producersMap.forEach((s, producer) -> producer.close(Duration.ofMillis(0)));
producersMap.clear();
}
};
@@ -346,7 +347,7 @@ import java.util.function.Predicate;
RetryUtils.retry(commitTask, isRetrayable, maxTries);
} catch (Exception e) {
// at this point we are in a funky state if one commit happend!! close and log it
- producersMap.forEach((key, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+ producersMap.forEach((key, producer) -> producer.close(Duration.ofMillis(0)));
LOG.error("Commit transaction failed", e);
if (committedTx.size() > 0) {
LOG.error("Partial Data Got Commited Some actions need to be Done");
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
index b0db0ad0550..f7ba2f50394 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -120,7 +121,7 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
@Override public void close(boolean abort) throws IOException {
if (abort) {
LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", writerId);
- producer.close(0, TimeUnit.MICROSECONDS);
+ producer.close(Duration.ofMillis(0));
return;
} else {
LOG.info("Flushing Kafka Producer with writerId [{}]", writerId);
@@ -159,7 +160,7 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
private void checkExceptions() throws IOException {
if (sendExceptionRef.get() != null) {
LOG.error("Send Exception Aborting write from writerId [{}]", writerId);
- producer.close(0, TimeUnit.MICROSECONDS);
+ producer.close(Duration.ofMillis(0));
throw new IOException(sendExceptionRef.get());
}
}
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
index e4015fcbba9..294cac80971 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -126,7 +126,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
producer.abortTransaction();
}
LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage());
- producer.close(0, TimeUnit.MILLISECONDS);
+ producer.close(DURATION_0);
throw exception;
}
writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic);
@@ -151,7 +151,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
// producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
producer.abortTransaction();
}
- producer.close(0, TimeUnit.MILLISECONDS);
+ producer.close(DURATION_0);
sendExceptionRef.compareAndSet(null, e);
checkExceptions();
}
@@ -278,7 +278,7 @@ class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordW
producer.abortTransaction();
}
LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage());
- producer.close(0, TimeUnit.MILLISECONDS);
+ producer.close(DURATION_0);
throw new IOException(exception);
}
}
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
index 287fff4cb26..e2f736619c0 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -74,7 +74,7 @@ class KafkaBrokerResource extends ExternalResource {
kafkaServer.zkClient();
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
- adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
}
/**
diff --git a/pom.xml b/pom.xml
index c2a2d9f6f52..363047a1c79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
<junit.version>4.13.2</junit.version>
<junit.jupiter.version>5.6.2</junit.jupiter.version>
<junit.vintage.version>5.6.3</junit.vintage.version>
- <kafka.version>2.5.0</kafka.version>
+ <kafka.version>3.4.0</kafka.version>
<kryo.version>5.0.3</kryo.version>
<reflectasm.version>1.11.9</reflectasm.version>
<kudu.version>1.12.0</kudu.version>
diff --git a/ql/pom.xml b/ql/pom.xml
index e65636058a5..be7a4bfe818 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -1101,6 +1101,7 @@
<include>com.esri.geometry:esri-geometry-api</include>
<include>org.roaringbitmap:RoaringBitmap</include>
<include>org.roaringbitmap:shims</include>
+ <include>com.github.luben:zstd-jni</include>
</includes>
</artifactSet>
<filters>