You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/02/24 09:55:44 UTC
[hive] branch master updated: HIVE-22898: CharsetDecoder race
condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d8cbd4f HIVE-22898: CharsetDecoder race condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)
d8cbd4f is described below
commit d8cbd4f2cb5a1b68112b974f8aa9e447795ab0d3
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Mon Feb 24 10:55:09 2020 +0100
HIVE-22898: CharsetDecoder race condition in OrcRecordUpdater (Antal Sinkovits via Peter Vary)
---
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 2 +-
.../hive/ql/io/orc/TestOrcRecordUpdater.java | 50 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 2d6a771..b9bcda9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -107,7 +107,6 @@ public class OrcRecordUpdater implements RecordUpdater {
final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024;
private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final CharsetDecoder utf8Decoder = UTF8.newDecoder();
private final AcidOutputFormat.Options options;
private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
@@ -656,6 +655,7 @@ public class OrcRecordUpdater implements RecordUpdater {
ByteBuffer val =
reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)
.duplicate();
+ CharsetDecoder utf8Decoder = UTF8.newDecoder();
stripes = utf8Decoder.decode(val).toString().split(";");
} catch (CharacterCodingException e) {
throw new IllegalArgumentException("Bad string encoding for " +
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index ef6dbbb..06a2d8d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -21,11 +21,21 @@ package org.apache.hadoop.hive.ql.io.orc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -314,4 +324,44 @@ public class TestOrcRecordUpdater {
assertEquals(false, rows.hasNext());
}
+
+ /*
+ CharsetDecoder instances are not thread safe, so it can end up in an inconsistent state when reading multiple
+ buffers parallel.
+ E.g:
+ java.lang.IllegalStateException: Current state = FLUSHED, new state = CODING_END
+ */
+ @Test
+ public void testConcurrentParseKeyIndex() throws Exception {
+
+ // Given
+ Reader mockReader = mock(Reader.class);
+ when(mockReader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn(true);
+
+ // Create a large buffer
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 3000; i++) {
+ sb.append("100000,200000,300000;");
+ }
+ when(mockReader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)).thenReturn(
+ ByteBuffer.wrap(sb.toString().getBytes()));
+
+ // When
+ // Hit OrcRecordUpdater.parseKeyIndex with large parallelism
+ final int parallelism = 4000;
+ Callable<RecordIdentifier[]>[] r = new Callable[parallelism];
+ for (int i = 0; i < parallelism; i++) {
+ r[i] = () -> {
+ return OrcRecordUpdater.parseKeyIndex(mockReader);
+ };
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
+ List<Future<RecordIdentifier[]>> res = executorService.invokeAll(Arrays.asList(r));
+
+ // Then
+ // Check for exceptions
+ for (Future<RecordIdentifier[]> ri : res) {
+ ri.get();
+ }
+ }
}