You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2020/01/28 16:33:50 UTC

[hive] branch master updated: HIVE-22393: HiveStreamingConnection: Exception in beginTransaction causes AbstractRecordWriter to throw NPE, covering up real exception (Matt Burgess via László Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c275e  HIVE-22393: HiveStreamingConnection: Exception in beginTransaction causes AbstractRecordWriter to throw NPE, covering up real exception (Matt Burgess via László Bodor)
89c275e is described below

commit 89c275e21395b28e4ed3a3dd317f67e3cd4bcc4f
Author: Matt Burgess <mb...@cloudera.com>
AuthorDate: Tue Jan 28 17:24:00 2020 +0100

    HIVE-22393: HiveStreamingConnection: Exception in beginTransaction causes AbstractRecordWriter to throw NPE, covering up real exception (Matt Burgess via László Bodor)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../apache/hive/streaming/AbstractRecordWriter.java | 10 +++++++---
 .../org/apache/hive/streaming/TestStreaming.java    | 21 +++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 03c9fe0..fc9a2dd 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -366,7 +366,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   @Override
   public void close() throws StreamingIOFailure {
-    heapMemoryMonitor.close();
+    if(heapMemoryMonitor != null) {
+      heapMemoryMonitor.close();
+    }
     boolean haveError = false;
     String partition = null;
     if (LOG.isDebugEnabled()) {
@@ -395,7 +397,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       logStats("Stats after close:");
     }
     try {
-      this.fs.close();
+      if(this.fs != null) {
+        this.fs.close();
+      }
     } catch (IOException e) {
       throw new StreamingIOFailure("Error while closing FileSystem", e);
     }
@@ -630,7 +634,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       .filter(Objects::nonNull)
       .mapToLong(RecordUpdater::getBufferedRowCount)
       .sum();
-    MemoryUsage memoryUsage = heapMemoryMonitor.getTenuredGenMemoryUsage();
+    MemoryUsage memoryUsage = heapMemoryMonitor == null ? null : heapMemoryMonitor.getTenuredGenMemoryUsage();
     String oldGenUsage = "NA";
     if (memoryUsage != null) {
       oldGenUsage = "used/max => " + LlapUtil.humanReadableByteCount(memoryUsage.getUsed()) + "/" +
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 58b3ae2..35a220f 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -1819,6 +1819,27 @@ public class TestStreaming {
 
   }
 
+  @Test(expected = ClassCastException.class)
+  public void testFileSystemError() throws Exception {
+    // Bad file system object, ClassCastException should occur during record writer init
+    conf.set("fs.raw.impl", Object.class.getName());
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(partitionVals)
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withRecordWriter(writer)
+            .withHiveConf(conf)
+            .connect();
+
+    connection.beginTransaction();
+  }
+
 
   @Test
   public void testTransactionBatchAbortAndCommit() throws Exception {