You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/02/24 09:55:44 UTC

[hive] branch master updated: HIVE-22898: CharsetDecoder race condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d8cbd4f  HIVE-22898: CharsetDecoder race condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)
d8cbd4f is described below

commit d8cbd4f2cb5a1b68112b974f8aa9e447795ab0d3
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Mon Feb 24 10:55:09 2020 +0100

    HIVE-22898: CharsetDecoder race condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)
---
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java    |  2 +-
 .../hive/ql/io/orc/TestOrcRecordUpdater.java       | 50 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 2d6a771..b9bcda9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -107,7 +107,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024;
 
   private static final Charset UTF8 = Charset.forName("UTF-8");
-  private static final CharsetDecoder utf8Decoder = UTF8.newDecoder();
 
   private final AcidOutputFormat.Options options;
   private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
@@ -656,6 +655,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       ByteBuffer val =
           reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
               .duplicate();
+      CharsetDecoder utf8Decoder = UTF8.newDecoder();
       stripes = utf8Decoder.decode(val).toString().split(";");
     } catch (CharacterCodingException e) {
       throw new IllegalArgumentException("Bad string encoding for " +
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index ef6dbbb..06a2d8d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -21,11 +21,21 @@ package org.apache.hadoop.hive.ql.io.orc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -314,4 +324,44 @@ public class TestOrcRecordUpdater {
 
     assertEquals(false, rows.hasNext());
   }
+
+  /*
+    CharsetDecoder instances are not thread safe, so it can end up in an inconsistent state when reading multiple
+    buffers parallel.
+    E.g:
+    java.lang.IllegalStateException: Current state = FLUSHED, new state = CODING_END
+  */
+  @Test
+  public void testConcurrentParseKeyIndex() throws Exception {
+
+    // Given
+    Reader mockReader = mock(Reader.class);
+    when(mockReader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn(true);
+
+    // Create a large buffer
+    final StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 3000; i++) {
+      sb.append("100000,200000,300000;");
+    }
+    when(mockReader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn(
+            ByteBuffer.wrap(sb.toString().getBytes()));
+
+    // When
+    // Hit OrcRecordUpdater.parseKeyIndex with large parallelism
+    final int parallelism = 4000;
+    Callable<RecordIdentifier[]>[] r = new Callable[parallelism];
+    for (int i = 0; i < parallelism; i++) {
+      r[i] = () -> {
+        return OrcRecordUpdater.parseKeyIndex(mockReader);
+      };
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
+    List<Future<RecordIdentifier[]>> res = executorService.invokeAll(Arrays.asList(r));
+
+    // Then
+    // Check for exceptions
+    for (Future<RecordIdentifier[]> ri : res) {
+      ri.get();
+    }
+  }
 }