You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/04/03 21:48:25 UTC
[hive] branch master updated: HIVE-21509: LLAP may cache corrupted
column vectors and return wrong query result (Adam Szita via Slim
Bouguerra)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a500116 HIVE-21509: LLAP may cache corrupted column vectors and return wrong query result (Adam Szita via Slim Bouguerra)
a500116 is described below
commit a500116daf9ba0a753226836dd7b49d883e1ca18
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Wed Apr 3 14:46:54 2019 -0700
HIVE-21509: LLAP may cache corrupted column vectors and return wrong query result (Adam Szita via Slim Bouguerra)
---
.../hive/llap/io/decode/EncodedDataConsumer.java | 7 ++
.../io/encoded/VectorDeserializeOrcWriter.java | 15 ++-
.../io/encoded/TestVectorDeserializeOrcWriter.java | 134 +++++++++++++++++++++
.../hadoop/hive/ql/exec/vector/ColumnVector.java | 22 ++++
4 files changed, 176 insertions(+), 2 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index f2d2832..84436bc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -153,6 +154,12 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
@Override
public void returnData(ColumnVectorBatch data) {
+ //In case a writer has a lock on any of the vectors we don't return it to the pool.
+ for (ColumnVector cv : data.cols) {
+ if (cv != null && cv.getRef() > 0) {
+ return;
+ }
+ }
cvbPool.offer(data);
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
index ca6d696..c100d6e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
@@ -369,11 +369,17 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
destinationBatch.endOfFile = sourceBatch.endOfFile;
}
- private void addBatchToWriter() throws IOException {
+ void addBatchToWriter() throws IOException {
propagateSourceBatchFieldsToDest();
if (!isAsync) {
orcWriter.addRowBatch(destinationBatch);
} else {
+ //Lock ColumnVectors so we don't accidentally reset them before they're written out
+ for (ColumnVector cv : destinationBatch.cols) {
+ if (cv != null) {
+ cv.incRef();
+ }
+ }
currentBatches.add(destinationBatch);
addWriteOp(new VrbOperation(destinationBatch));
}
@@ -431,7 +437,7 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
return result;
}
- private static interface WriteOperation {
+ interface WriteOperation {
boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
}
@@ -447,6 +453,11 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
// LlapIoImpl.LOG.debug("Writing batch " + batch);
writer.addRowBatch(batch);
+ for (ColumnVector cv : batch.cols) {
+ if (cv != null) {
+ assert (cv.decRef() == 0);
+ }
+ }
return false;
}
}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java
new file mode 100644
index 0000000..ef7b1a3
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.util.ArrayList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.impl.SchemaEvolution;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.internal.util.reflection.Whitebox.getInternalState;
+import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
+
+/**
+ * Unit tests for VectorDeserializeOrcWriter.
+ */
+public class TestVectorDeserializeOrcWriter {
+
+ private static final int TEST_NUM_COLS = 2;
+
+ @Test
+ public void testConcurrencyIssueWhileWriting() throws Exception {
+
+ //Setup////////////////////////////////////////////////////////////////////////////////////////
+ EncodedDataConsumer consumer = createBlankEncodedDataConsumer();
+ FixedSizedObjectPool<ColumnVectorBatch> cvbPool = (FixedSizedObjectPool<ColumnVectorBatch>)
+ getInternalState(consumer, "cvbPool");
+
+ ColumnVectorBatch cvb = new ColumnVectorBatch(TEST_NUM_COLS);
+ VectorizedRowBatch vrb = new VectorizedRowBatch(TEST_NUM_COLS);
+ createTestVectors(cvb, vrb);
+
+ Queue<VectorDeserializeOrcWriter.WriteOperation> writeOpQueue = new ConcurrentLinkedQueue<>();
+ VectorDeserializeOrcWriter orcWriter = createOrcWriter(writeOpQueue, vrb);
+
+
+ //Simulating unfortunate order of events///////////////////////////////////////////////////////
+ //Add CVs to writer -> should increase their refcount
+ //Happens when IO thread has generated a vector batch and hands it over to async ORC thread
+ orcWriter.addBatchToWriter();
+
+ //Return CVs to pool -> should check their refcount, and as they're 1, this should be a no-op
+ //Happens when LLAPRecordReader on Tez thread received and used the batch and now wants to
+ // return it for CVB recycling
+ consumer.returnData(cvb);
+
+ //Do the write -> should decrease the refcount of CVs
+ //Happens when ORC thread gets to writing and hands the vectors of the batch over to ORC
+ // WriterImpl for encoding and cache storage
+ writeOpQueue.poll().apply(mock(WriterImpl.class), null);
+
+
+ //Verifications////////////////////////////////////////////////////////////////////////////////
+ //Pool should be empty as the CVB return should have been a no-op, so this call should create a
+ // NEW instance of CVBs
+ ColumnVectorBatch newCvb = cvbPool.take();
+ assertNotEquals(newCvb, cvb);
+
+ //Simulating a 'clean' CVB return -> the CVB now does have to make its way back to the pool
+ consumer.returnData(cvb);
+ newCvb = cvbPool.take();
+ assertEquals(newCvb, cvb);
+ }
+
+ private static void createTestVectors(ColumnVectorBatch cvb, VectorizedRowBatch vrb) {
+ for (int i = 0; i < TEST_NUM_COLS; ++i) {
+ LongColumnVector cv = new LongColumnVector();
+ cv.fill(i);
+ cvb.cols[i] = cv;
+ vrb.cols[i] = cv;
+ }
+ }
+
+ private static VectorDeserializeOrcWriter createOrcWriter(
+ Queue<VectorDeserializeOrcWriter.WriteOperation> writeOpQueue, VectorizedRowBatch vrb) {
+ VectorDeserializeOrcWriter orcWriter = mock(VectorDeserializeOrcWriter.class,
+ withSettings().defaultAnswer(CALLS_REAL_METHODS));
+ setInternalState(orcWriter, "sourceBatch", vrb);
+ setInternalState(orcWriter, "destinationBatch", vrb);
+ setInternalState(orcWriter, "currentBatches", new ArrayList<VectorizedRowBatch>());
+ setInternalState(orcWriter, "queue", writeOpQueue);
+ setInternalState(orcWriter, "isAsync", true);
+ return orcWriter;
+ }
+
+ private static EncodedDataConsumer createBlankEncodedDataConsumer() {
+ return new EncodedDataConsumer(null, 1, null, null) {
+ @Override
+ protected void decodeBatch(EncodedColumnBatch batch, Consumer downstreamConsumer)
+ throws InterruptedException {
+ }
+
+ @Override
+ public SchemaEvolution getSchemaEvolution() {
+ return null;
+ }
+
+ @Override
+ public void consumeData(Object data) throws InterruptedException {
+ }
+ };
+ }
+
+}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index f2ad6d2..9f611df 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* ColumnVector contains the shared structure for the sub-types,
@@ -32,6 +33,10 @@ import java.util.Arrays;
*/
public abstract class ColumnVector {
+
+ /** Reference count. */
+ private AtomicInteger refCount = new AtomicInteger(0);
+
/**
* The current kinds of column vectors.
*/
@@ -95,6 +100,7 @@ public abstract class ColumnVector {
* - sets isRepeating to false
*/
public void reset() {
+ assert (refCount.get() == 0);
if (!noNulls) {
Arrays.fill(isNull, false);
}
@@ -104,6 +110,21 @@ public abstract class ColumnVector {
preFlattenIsRepeating = false;
}
+
+ public final void incRef() {
+ refCount.incrementAndGet();
+ }
+
+ public final int getRef() {
+ return refCount.get();
+ }
+
+ public final int decRef() {
+ int i = refCount.decrementAndGet();
+ assert i >= 0;
+ return i;
+ }
+
/**
* Sets the isRepeating flag. Recurses over structs and unions so that the
* flags are set correctly.
@@ -258,5 +279,6 @@ public abstract class ColumnVector {
otherCv.isRepeating = isRepeating;
otherCv.preFlattenIsRepeating = preFlattenIsRepeating;
otherCv.preFlattenNoNulls = preFlattenNoNulls;
+ otherCv.refCount = refCount;
}
}