You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/10/14 19:21:24 UTC
[hive] branch master updated: HIVE-24266: Committed rows in
hflush'd ACID files may be missing from query result (Adam Szita,
reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1a7fa HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary)
5d1a7fa is described below
commit 5d1a7facf3dfc90a35611b7b5bcbd5c57e4e0dc9
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Wed Oct 14 21:21:00 2020 +0200
HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary)
---
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 42 ++++++-
.../hive/ql/io/orc/TestInputOutputFormat.java | 128 ++++++++++++++++++++-
2 files changed, 162 insertions(+), 8 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0d028bc..b76f797 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.common.NoDynamicValuesException;
import org.apache.hadoop.fs.PathFilter;
@@ -1156,13 +1157,40 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
- if (entry.getKey() + entry.getValue().getLength() > logicalLen) {
+ long blockOffset = entry.getKey();
+ long blockLength = entry.getValue().getLength();
+ if (blockOffset > logicalLen) {
//don't create splits for anything past logical EOF
- continue;
+ //map is ordered, thus any possible entry in the iteration after this is bound to be > logicalLen
+ break;
}
- OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
- entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
- deltas, -1, logicalLen, dir, offsetAndBucket);
+ long splitLength = blockLength;
+
+ long blockEndOvershoot = (blockOffset + blockLength) - logicalLen;
+ if (blockEndOvershoot > 0) {
+ // if logicalLen is placed within a block, we should make (this last) split out of the part of this block
+ // -> we should read less than block end
+ splitLength -= blockEndOvershoot;
+ } else if (blockOffsets.lastKey() == blockOffset && blockEndOvershoot < 0) {
+ // This is the last block but it ends before logicalLen
+ // This can happen with HDFS if hflush was called and blocks are not persisted to disk yet, but content
+ // is otherwise available for readers, as DNs have these buffers in memory at this time.
+ // -> we should read more than (persisted) block end, but only within the block
+ if (fileStatus instanceof HdfsLocatedFileStatus) {
+ HdfsLocatedFileStatus hdfsFileStatus = (HdfsLocatedFileStatus)fileStatus;
+ if (hdfsFileStatus.getLocatedBlocks().isUnderConstruction()) {
+ // sanity check
+ if (logicalLen > blockOffset + hdfsFileStatus.getBlockSize()) {
+ throw new IOException("Side file indicates more data available after the last known block!");
+ }
+ // blockEndOvershoot is negative here...
+ splitLength -= blockEndOvershoot;
+ }
+ }
+ }
+ OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, blockOffset,
+ splitLength, entry.getValue().getHosts(), null, isOriginal, true,
+ deltas, -1, logicalLen, dir, offsetAndBucket);
splits.add(orcSplit);
}
}
@@ -1431,6 +1459,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
//this is the root of the partition in which the 'file' is located
private final Path rootDir;
OrcSplit.OffsetAndBucketProperty offsetAndBucket = null;
+ boolean isAcidTableScan;
public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1452,6 +1481,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.deltaSplits = splitInfo.getSplits();
this.allowSyntheticFileIds = allowSyntheticFileIds;
this.ppdResult = splitInfo.ppdResult;
+ this.isAcidTableScan = AcidUtils.isFullAcidScan(context.conf);
}
public boolean isBlocking() {
@@ -1550,7 +1580,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
// scale the raw data size to split level based on ratio of split wrt to file length
- final long fileLen = file.getLen();
+ final long fileLen = isAcidTableScan ? AcidUtils.getLogicalLength(fs, file) : file.getLen();
final double splitRatio = (double) length / (double) fileLen;
final long scaledProjSize = projColsUncompressedSize > 0 ?
(long) (splitRatio * projColsUncompressedSize) : fileLen;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 4865df5..c51106c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.DataInput;
import java.io.DataOutput;
@@ -36,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -47,6 +50,8 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -1070,14 +1075,23 @@ public class TestInputOutputFormat {
int length;
MockBlock[] blocks;
byte[] content;
+ // If true - it will simulate an HDFS file that was hflushed (i.e. might have content above what can be found as
+ // first block on disk) and is still being written to (e.g. isUnderConstruction is true)
+ boolean isHdfsHflushed = false;
public MockFile(String path, int blockSize, byte[] content,
+ MockBlock... blocks) {
+ this(path, blockSize, content, false, blocks);
+ }
+
+ public MockFile(String path, int blockSize, byte[] content, boolean isHdfsHflushed,
MockBlock... blocks) {
this.path = new Path(path);
this.blockSize = blockSize;
this.blocks = blocks;
this.content = content;
this.length = content.length;
+ this.isHdfsHflushed = isHdfsHflushed;
int offset = 0;
for(MockBlock block: blocks) {
block.offset = offset;
@@ -1148,6 +1162,11 @@ public class TestInputOutputFormat {
}
return -1;
}
+
+ @Override
+ public int available() throws IOException {
+ return file.length - offset;
+ }
}
public static class MockPath extends Path {
@@ -1506,8 +1525,23 @@ public class TestInputOutputFormat {
private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException {
FileStatus fileStatus = createStatus(file);
- return new LocatedFileStatus(fileStatus,
- getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+ if (file.isHdfsHflushed) {
+ // Should work the same way as the local status except for having isUnderConstruction flag set to true and
+ // having HdfsLocatedFileStatus type
+ LocatedBlocks lb = new LocatedBlocks(fileStatus.getLen(), true, null, null, false, null, null);
+ HdfsLocatedFileStatus mockStatus = mock(HdfsLocatedFileStatus.class);
+ when(mockStatus.getLocatedBlocks()).thenReturn(lb);
+ when(mockStatus.getPath()).thenReturn(fileStatus.getPath());
+ when(mockStatus.getLen()).thenReturn(fileStatus.getLen());
+ when(mockStatus.isDirectory()).thenReturn(false);
+ when(mockStatus.isFile()).thenReturn(true);
+ when(mockStatus.getBlockSize()).thenReturn(fileStatus.getBlockSize());
+ when(mockStatus.getBlockLocations()).thenReturn(getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(),
+ false));
+ return mockStatus;
+ } else {
+ return new LocatedFileStatus(fileStatus, getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false));
+ }
}
private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException {
@@ -4221,4 +4255,94 @@ public class TestInputOutputFormat {
reader.close();
}
+
+
+ private static List<MockFile> mockDeltaWithSideFileForStreaming(String delta, int contentLength, int flush_length) {
+ final int blockSize = 1000;
+ boolean isDeltaHflushed = contentLength < flush_length;
+
+ List<MockFile> files = new LinkedList<>();
+
+ ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
+ bb.putLong(flush_length);
+ bb.array();
+
+ MockBlock[] blocks = new MockBlock[(contentLength / blockSize) + 1];
+ for (int i = 0; i < blocks.length; ++i) {
+ blocks[i] = new MockBlock("host1");
+ }
+
+ files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000", blockSize, new byte[contentLength], isDeltaHflushed,
+ blocks));
+ files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000_flush_length", blockSize, bb.array(), false,
+ new MockBlock("host1")));
+
+ return files;
+ }
+
+ private List<OrcSplit> splitsForStreamingAcidTable(List<MockFile> files) throws Exception {
+ try {
+ MockFileSystem fs = new MockFileSystem(conf);
+ files.forEach(f -> MockFileSystem.addGlobalFile(f));
+ conf.set("bucket_count", "1");
+ //set up props for read
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ AcidUtils.setAcidOperationalProperties(conf, true, null);
+ conf.set(ValidTxnList.VALID_TXNS_KEY,
+ new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS,
+ TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty());
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES,
+ TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty());
+ conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true);
+ MockPath mockPath = new MockPath(fs, "mock:/streaming");
+ conf.set("mapred.input.dir", mockPath.toString());
+ conf.set("fs.defaultFS", "mock:///");
+ conf.set("fs.mock.impl", MockFileSystem.class.getName());
+ conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf, 0);
+
+ OrcInputFormat.FileGenerator gen =
+ new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs,
+ "mock:/streaming"),
+ false, null);
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ return ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+
+ } finally {
+ MockFileSystem.clearGlobalFiles();
+ }
+ }
+
+ @Test
+ public void testAcidTableStreamingBISplitGeneration() throws Exception {
+ List<OrcSplit> result = null;
+ List<MockFile> files = new LinkedList<>();
+
+ // 1 complete delta file + 1 incomplete where more rows were committed than written to disk
+ // (1000) + (15/95)
+ files.addAll(mockDeltaWithSideFileForStreaming("delta_0000001_0000010_0000", 1000, 1000));
+ files.addAll(mockDeltaWithSideFileForStreaming("delta_0000011_0000020_0000", 15, 95));
+ result = splitsForStreamingAcidTable(files);
+ files.clear();
+ assertEquals(1000, result.get(0).getLength());
+ assertEquals(95, result.get(1).getLength());
+
+ // 1 incomplete delta with 2 complete and 1 incomplete blocks: (1000 + 1000 + 500/800)
+ files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 2500, 2800));
+ result = splitsForStreamingAcidTable(files);
+ files.clear();
+ assertEquals(1000, result.get(0).getLength());
+ assertEquals(1000, result.get(1).getLength());
+ assertEquals(800, result.get(2).getLength());
+
+ // 1 complete delta but shorter flush_length - though I think this is almost impossible
+ files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 1000, 450));
+ result = splitsForStreamingAcidTable(files);
+ files.clear();
+ assertEquals(450, result.get(0).getLength());
+
+ }
}