You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/04/03 21:48:25 UTC

[hive] branch master updated: HIVE-21509: LLAP may cache corrupted column vectors and return wrong query result (Adam Szita via Slim Bouguerra)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a500116  HIVE-21509: LLAP may cache corrupted column vectors and return wrong query result (Adam Szita via Slim Bouguerra)
a500116 is described below

commit a500116daf9ba0a753226836dd7b49d883e1ca18
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Wed Apr 3 14:46:54 2019 -0700

    HIVE-21509: LLAP may cache corrupted column vectors and return wrong query result (Adam Szita via Slim Bouguerra)
---
 .../hive/llap/io/decode/EncodedDataConsumer.java   |   7 ++
 .../io/encoded/VectorDeserializeOrcWriter.java     |  15 ++-
 .../io/encoded/TestVectorDeserializeOrcWriter.java | 134 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/vector/ColumnVector.java   |  22 ++++
 4 files changed, 176 insertions(+), 2 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index f2d2832..84436bc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.encoded.TezCounterSource;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
 import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -153,6 +154,12 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
 
   @Override
   public void returnData(ColumnVectorBatch data) {
+    //In case a writer has a lock on any of the vectors we don't return it to the pool.
+    for (ColumnVector cv : data.cols) {
+      if (cv != null && cv.getRef() > 0) {
+        return;
+      }
+    }
     cvbPool.offer(data);
   }
 
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
index ca6d696..c100d6e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
@@ -369,11 +369,17 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
     destinationBatch.endOfFile = sourceBatch.endOfFile;
   }
 
-  private void addBatchToWriter() throws IOException {
+  void addBatchToWriter() throws IOException {
     propagateSourceBatchFieldsToDest();
     if (!isAsync) {
       orcWriter.addRowBatch(destinationBatch);
     } else {
+      //Lock ColumnVectors so we don't accidentally reset them before they're written out
+      for (ColumnVector cv : destinationBatch.cols) {
+        if (cv != null) {
+          cv.incRef();
+        }
+      }
       currentBatches.add(destinationBatch);
       addWriteOp(new VrbOperation(destinationBatch));
     }
@@ -431,7 +437,7 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
     return result;
   }
 
-  private static interface WriteOperation {
+  interface WriteOperation {
     boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
   }
 
@@ -447,6 +453,11 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
     public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
       // LlapIoImpl.LOG.debug("Writing batch " + batch);
       writer.addRowBatch(batch);
+      for (ColumnVector cv : batch.cols) {
+        if (cv != null) {
+          assert (cv.decRef() == 0);
+        }
+      }
       return false;
     }
   }
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java
new file mode 100644
index 0000000..ef7b1a3
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/encoded/TestVectorDeserializeOrcWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.util.ArrayList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+import org.apache.orc.impl.SchemaEvolution;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.internal.util.reflection.Whitebox.getInternalState;
+import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
+
+/**
+ * Unit tests for VectorDeserializeOrcWriter.
+ */
+public class TestVectorDeserializeOrcWriter {
+
+  private static final int TEST_NUM_COLS = 2;
+
+  @Test
+  public void testConcurrencyIssueWhileWriting() throws Exception {
+
+    //Setup////////////////////////////////////////////////////////////////////////////////////////
+    EncodedDataConsumer consumer = createBlankEncodedDataConsumer();
+    FixedSizedObjectPool<ColumnVectorBatch> cvbPool = (FixedSizedObjectPool<ColumnVectorBatch>)
+            getInternalState(consumer, "cvbPool");
+
+    ColumnVectorBatch cvb = new ColumnVectorBatch(TEST_NUM_COLS);
+    VectorizedRowBatch vrb = new VectorizedRowBatch(TEST_NUM_COLS);
+    createTestVectors(cvb, vrb);
+
+    Queue<VectorDeserializeOrcWriter.WriteOperation> writeOpQueue = new ConcurrentLinkedQueue<>();
+    VectorDeserializeOrcWriter orcWriter = createOrcWriter(writeOpQueue, vrb);
+
+
+    //Simulating unfortunate order of events///////////////////////////////////////////////////////
+    //Add CVs to writer -> should increase their refcount
+    //Happens when IO thread has generated a vector batch and hands it over to async ORC thread
+    orcWriter.addBatchToWriter();
+
+    //Return CVs to pool -> should check their refcount, and as they're 1, this should be a no-op
+    //Happens when LLAPRecordReader on Tez thread received and used the batch and now wants to
+    // return it for CVB recycling
+    consumer.returnData(cvb);
+
+    //Do the write -> should decrease the refcount of CVs
+    //Happens when ORC thread gets to writing and hands the vectors of the batch over to ORC
+    // WriterImpl for encoding and cache storage
+    writeOpQueue.poll().apply(mock(WriterImpl.class), null);
+
+
+    //Verifications////////////////////////////////////////////////////////////////////////////////
+    //Pool should be empty as the CVB return should have been a no-op, so this call should create a
+    // NEW instance of CVBs
+    ColumnVectorBatch newCvb = cvbPool.take();
+    assertNotEquals(newCvb, cvb);
+
+    //Simulating a 'clean' CVB return -> the CVB now does have to make its way back to the pool
+    consumer.returnData(cvb);
+    newCvb = cvbPool.take();
+    assertEquals(newCvb, cvb);
+  }
+
+  private static void createTestVectors(ColumnVectorBatch cvb, VectorizedRowBatch vrb) {
+    for (int i = 0; i < TEST_NUM_COLS; ++i) {
+      LongColumnVector cv = new LongColumnVector();
+      cv.fill(i);
+      cvb.cols[i] = cv;
+      vrb.cols[i] = cv;
+    }
+  }
+
+  private static VectorDeserializeOrcWriter createOrcWriter(
+          Queue<VectorDeserializeOrcWriter.WriteOperation> writeOpQueue, VectorizedRowBatch vrb) {
+    VectorDeserializeOrcWriter orcWriter = mock(VectorDeserializeOrcWriter.class,
+            withSettings().defaultAnswer(CALLS_REAL_METHODS));
+    setInternalState(orcWriter, "sourceBatch", vrb);
+    setInternalState(orcWriter, "destinationBatch", vrb);
+    setInternalState(orcWriter, "currentBatches", new ArrayList<VectorizedRowBatch>());
+    setInternalState(orcWriter, "queue", writeOpQueue);
+    setInternalState(orcWriter, "isAsync", true);
+    return orcWriter;
+  }
+
+  private static EncodedDataConsumer createBlankEncodedDataConsumer() {
+    return new EncodedDataConsumer(null, 1, null, null) {
+      @Override
+      protected void decodeBatch(EncodedColumnBatch batch, Consumer downstreamConsumer)
+              throws InterruptedException {
+      }
+
+      @Override
+      public SchemaEvolution getSchemaEvolution() {
+        return null;
+      }
+
+      @Override
+      public void consumeData(Object data) throws InterruptedException {
+      }
+    };
+  }
+
+}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index f2ad6d2..9f611df 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * ColumnVector contains the shared structure for the sub-types,
@@ -32,6 +33,10 @@ import java.util.Arrays;
  */
 public abstract class ColumnVector {
 
+
+  /** Reference count. */
+  private AtomicInteger refCount = new AtomicInteger(0);
+
   /**
    * The current kinds of column vectors.
    */
@@ -95,6 +100,7 @@ public abstract class ColumnVector {
    *  - sets isRepeating to false
    */
   public void reset() {
+    assert (refCount.get() == 0);
     if (!noNulls) {
       Arrays.fill(isNull, false);
     }
@@ -104,6 +110,21 @@ public abstract class ColumnVector {
     preFlattenIsRepeating = false;
   }
 
+
+  public final void incRef() {
+    refCount.incrementAndGet();
+  }
+
+  public final int getRef() {
+    return refCount.get();
+  }
+
+  public final int decRef() {
+    int i = refCount.decrementAndGet();
+    assert i >= 0;
+    return i;
+  }
+
   /**
    * Sets the isRepeating flag. Recurses over structs and unions so that the
    * flags are set correctly.
@@ -258,5 +279,6 @@ public abstract class ColumnVector {
     otherCv.isRepeating = isRepeating;
     otherCv.preFlattenIsRepeating = preFlattenIsRepeating;
     otherCv.preFlattenNoNulls = preFlattenNoNulls;
+    otherCv.refCount = refCount;
   }
 }