You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/10/14 19:21:24 UTC

[hive] branch master updated: HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1a7fa  HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary)
5d1a7fa is described below

commit 5d1a7facf3dfc90a35611b7b5bcbd5c57e4e0dc9
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Wed Oct 14 21:21:00 2020 +0200

    HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary)
---
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |  42 ++++++-
 .../hive/ql/io/orc/TestInputOutputFormat.java      | 128 ++++++++++++++++++++-
 2 files changed, 162 insertions(+), 8 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0d028bc..b76f797 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.NoDynamicValuesException;
 import org.apache.hadoop.fs.PathFilter;
@@ -1156,13 +1157,40 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           } else {
             TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
             for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
-              if (entry.getKey() + entry.getValue().getLength() > logicalLen) {
+              long blockOffset = entry.getKey();
+              long blockLength = entry.getValue().getLength();
+              if (blockOffset > logicalLen) {
                 //don't create splits for anything past logical EOF
-                continue;
+                //map is ordered, thus any possible entry in the iteration after this is bound to be > logicalLen
+                break;
               }
-              OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
-                entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
-                deltas, -1, logicalLen, dir, offsetAndBucket);
+              long splitLength = blockLength;
+
+              long blockEndOvershoot = (blockOffset + blockLength) - logicalLen;
+              if (blockEndOvershoot > 0) {
+                // if logicalLen is placed within a block, we should make (this last) split out of the part of this block
+                // -> we should read less than block end
+                splitLength -= blockEndOvershoot;
+              } else if (blockOffsets.lastKey() == blockOffset && blockEndOvershoot < 0) {
+                // This is the last block but it ends before logicalLen
+                // This can happen with HDFS if hflush was called and blocks are not persisted to disk yet, but content
+                // is otherwise available for readers, as DNs have these buffers in memory at this time.
+                // -> we should read more than (persisted) block end, but only within the block
+                if (fileStatus instanceof HdfsLocatedFileStatus) {
+                  HdfsLocatedFileStatus hdfsFileStatus = (HdfsLocatedFileStatus)fileStatus;
+                  if (hdfsFileStatus.getLocatedBlocks().isUnderConstruction()) {
+                    // sanity check
+                    if (logicalLen > blockOffset + hdfsFileStatus.getBlockSize()) {
+                      throw new IOException("Side file indicates more data available after the last known block!");
+                    }
+                    // blockEndOvershoot is negative here...
+                    splitLength -= blockEndOvershoot;
+                  }
+                }
+              }
+              OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, blockOffset,
+                  splitLength, entry.getValue().getHosts(), null, isOriginal, true,
+                  deltas, -1, logicalLen, dir, offsetAndBucket);
               splits.add(orcSplit);
             }
           }
@@ -1431,6 +1459,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     //this is the root of the partition in which the 'file' is located
     private final Path rootDir;
     OrcSplit.OffsetAndBucketProperty offsetAndBucket = null;
+    boolean isAcidTableScan;
 
     public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
         boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1452,6 +1481,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.deltaSplits = splitInfo.getSplits();
       this.allowSyntheticFileIds = allowSyntheticFileIds;
       this.ppdResult = splitInfo.ppdResult;
+      this.isAcidTableScan = AcidUtils.isFullAcidScan(context.conf);
     }
 
     public boolean isBlocking() {
@@ -1550,7 +1580,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
 
       // scale the raw data size to split level based on ratio of split wrt to file length
-      final long fileLen = file.getLen();
+      final long fileLen = isAcidTableScan ? AcidUtils.getLogicalLength(fs, file) : file.getLen();
       final double splitRatio = (double) length / (double) fileLen;
       final long scaledProjSize = projColsUncompressedSize > 0 ?
           (long) (splitRatio * projColsUncompressedSize) : fileLen;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 4865df5..c51106c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -36,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -47,6 +50,8 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -1070,14 +1075,23 @@ public class TestInputOutputFormat {
     int length;
     MockBlock[] blocks;
     byte[] content;
+    // If true - it will simulate an HDFS file that was hflushed (i.e. might have content above what can be found as
+    // first block on disk) and is still being written to (e.g. isUnderConstruction is true)
+    boolean isHdfsHflushed = false;
 
     public MockFile(String path, int blockSize, byte[] content,
+        MockBlock... blocks) {
+      this(path, blockSize, content, false, blocks);
+    }
+
+    public MockFile(String path, int blockSize, byte[] content, boolean isHdfsHflushed,
                     MockBlock... blocks) {
       this.path = new Path(path);
       this.blockSize = blockSize;
       this.blocks = blocks;
       this.content = content;
       this.length = content.length;
+      this.isHdfsHflushed = isHdfsHflushed;
       int offset = 0;
       for(MockBlock block: blocks) {
         block.offset = offset;
@@ -1148,6 +1162,11 @@ public class TestInputOutputFormat {
       }
       return -1;
     }
+
+    @Override
+    public int available() throws IOException {
+      return file.length - offset;
+    }
   }
 
   public static class MockPath extends Path {
@@ -1506,8 +1525,23 @@ public class TestInputOutputFormat {
 
     private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException {
       FileStatus fileStatus = createStatus(file);
-      return new LocatedFileStatus(fileStatus,
-          getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+      if (file.isHdfsHflushed) {
+        // Should work the same way as the local status except for having isUnderConstruction flag set to true and
+        // having HdfsLocatedFileStatus type
+        LocatedBlocks lb = new LocatedBlocks(fileStatus.getLen(), true, null, null, false, null, null);
+        HdfsLocatedFileStatus mockStatus = mock(HdfsLocatedFileStatus.class);
+        when(mockStatus.getLocatedBlocks()).thenReturn(lb);
+        when(mockStatus.getPath()).thenReturn(fileStatus.getPath());
+        when(mockStatus.getLen()).thenReturn(fileStatus.getLen());
+        when(mockStatus.isDirectory()).thenReturn(false);
+        when(mockStatus.isFile()).thenReturn(true);
+        when(mockStatus.getBlockSize()).thenReturn(fileStatus.getBlockSize());
+        when(mockStatus.getBlockLocations()).thenReturn(getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(),
+            false));
+        return mockStatus;
+      } else {
+        return new LocatedFileStatus(fileStatus, getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+      }
     }
 
     private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException {
@@ -4221,4 +4255,94 @@ public class TestInputOutputFormat {
 
     reader.close();
   }
+
+
+  private static List<MockFile> mockDeltaWithSideFileForStreaming(String delta, int contentLength, int flush_length) {
+    final int blockSize = 1000;
+    boolean isDeltaHflushed = contentLength < flush_length;
+
+    List<MockFile> files = new LinkedList<>();
+
+    ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
+    bb.putLong(flush_length);
+    bb.array();
+
+    MockBlock[] blocks = new MockBlock[(contentLength / blockSize) + 1];
+    for (int i = 0; i < blocks.length; ++i) {
+      blocks[i] = new MockBlock("host1");
+    }
+
+    files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000", blockSize, new byte[contentLength], isDeltaHflushed,
+        blocks));
+    files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000_flush_length", blockSize, bb.array(), false,
+        new MockBlock("host1")));
+
+    return files;
+  }
+
+  private List<OrcSplit> splitsForStreamingAcidTable(List<MockFile> files) throws Exception {
+    try {
+      MockFileSystem fs = new MockFileSystem(conf);
+      files.forEach(f -> MockFileSystem.addGlobalFile(f));
+      conf.set("bucket_count", "1");
+      //set up props for read
+      conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+      AcidUtils.setAcidOperationalProperties(conf, true, null);
+      conf.set(ValidTxnList.VALID_TXNS_KEY,
+          new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS,
+          TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty());
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES,
+          TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty());
+      conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+      MockPath mockPath = new MockPath(fs, "mock:/streaming");
+      conf.set("mapred.input.dir", mockPath.toString());
+      conf.set("fs.defaultFS", "mock:///");
+      conf.set("fs.mock.impl", MockFileSystem.class.getName());
+      conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+      OrcInputFormat.Context context = new OrcInputFormat.Context(conf, 0);
+
+      OrcInputFormat.FileGenerator gen =
+          new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs,
+              "mock:/streaming"),
+              false, null);
+      List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+      assertEquals(1, splitStrategies.size());
+      assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+      return ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+    } finally {
+      MockFileSystem.clearGlobalFiles();
+    }
+  }
+
+  @Test
+  public void testAcidTableStreamingBISplitGeneration() throws Exception {
+    List<OrcSplit> result = null;
+    List<MockFile> files = new LinkedList<>();
+
+    // 1 complete delta file + 1 incomplete where more rows were committed than written to disk
+    // (1000) + (15/95)
+    files.addAll(mockDeltaWithSideFileForStreaming("delta_0000001_0000010_0000", 1000, 1000));
+    files.addAll(mockDeltaWithSideFileForStreaming("delta_0000011_0000020_0000", 15, 95));
+    result = splitsForStreamingAcidTable(files);
+    files.clear();
+    assertEquals(1000, result.get(0).getLength());
+    assertEquals(95, result.get(1).getLength());
+
+    // 1 incomplete delta with 2 complete and 1 incomplete blocks: (1000 + 1000 + 500/800)
+    files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 2500, 2800));
+    result = splitsForStreamingAcidTable(files);
+    files.clear();
+    assertEquals(1000, result.get(0).getLength());
+    assertEquals(1000, result.get(1).getLength());
+    assertEquals(800, result.get(2).getLength());
+
+    // 1 complete delta but shorter flush_length - though I think this is almost impossible
+    files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 1000, 450));
+    result = splitsForStreamingAcidTable(files);
+    files.clear();
+    assertEquals(450, result.get(0).getLength());
+
+  }
 }