You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/07/11 04:51:43 UTC
[2/2] hive git commit: HIVE-16177 on Acid to acid conversion doesn't
handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin)
HIVE-16177 on Acid to acid conversion doesn't handle _copy_N files (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/37f84ca0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/37f84ca0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/37f84ca0
Branch: refs/heads/master
Commit: 37f84ca096de15ee7a7a0779705e504fe9c96b4e
Parents: 6489352
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 10 21:50:56 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 10 21:51:20 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/Utilities.java | 2 +-
.../hadoop/hive/ql/io/AcidInputFormat.java | 5 +-
.../hadoop/hive/ql/io/AcidOutputFormat.java | 21 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 58 ++-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 63 +--
.../hive/ql/io/orc/OrcRawRecordMerger.java | 406 +++++++++++++++++--
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 9 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 2 +-
.../hive/ql/txn/compactor/CompactorMR.java | 9 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 54 ++-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 62 +++
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 19 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 18 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 28 +-
.../TestVectorizedOrcAcidRowBatchReader.java | 2 +-
.../queries/clientpositive/insert_orig_table.q | 20 +-
.../clientpositive/insert_values_orig_table.q | 20 +-
.../insert_values_orig_table_use_metadata.q | 23 +-
.../clientpositive/insert_orig_table.q.out | 68 +++-
.../insert_values_orig_table.q.out | 68 +++-
.../insert_values_orig_table_use_metadata.q.out | 104 ++++-
.../clientpositive/llap/insert_orig_table.q.out | 68 +++-
22 files changed, 977 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a1cf76b..ba97d22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1205,7 +1205,7 @@ public final class Utilities {
* Group 6: copy [copy keyword]
* Group 8: 2 [copy file index]
*/
- private static final String COPY_KEYWORD = "_copy_"; // copy keyword
+ public static final String COPY_KEYWORD = "_copy_"; // copy keyword
private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX =
Pattern.compile("^.*?"+ // any prefix
"([0-9]+)"+ // taskId
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 7c7074d..25177ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,10 +112,13 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
private long minTxnId;
private long maxTxnId;
private List<Integer> stmtIds;
-
+
public DeltaMetaData() {
this(0,0,new ArrayList<Integer>());
}
+ /**
+ * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
+ */
DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
this.minTxnId = minTxnId;
this.maxTxnId = maxTxnId;
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index b85b827..405cfde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.WritableComparable;
@@ -51,6 +52,11 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
private long minimumTransactionId;
private long maximumTransactionId;
private int bucket;
+ /**
+ * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)}
+ * _copy_N starts with 1.
+ */
+ private int copyNumber = 0;
private PrintStream dummyStream = null;
private boolean oldStyle = false;
private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
@@ -180,6 +186,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
}
/**
+ * Multiple inserts into legacy (pre-acid) tables can generate multiple copies of each bucket
+ * file.
+ * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD
+ * @param copyNumber the number of the copy ( > 0)
+ * @return this
+ */
+ public Options copyNumber(int copyNumber) {
+ this.copyNumber = copyNumber;
+ return this;
+ }
+
+ /**
* Whether it should use the old style (0000000_0) filenames.
* @param value should use the old style names
* @return this
@@ -293,6 +311,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
public int getStatementId() {
return statementId;
}
+ public int getCopyNumber() {
+ return copyNumber;
+ }
public Path getFinalDestination() {
return finalDestination;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index da00bb3..1c03736 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -48,6 +49,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
+
/**
* Utilities that are shared by all of the ACID input and output formats. They
* are used by the compactor and cleaner and thus must be format agnostic.
@@ -99,6 +102,10 @@ public class AcidUtils {
public static final int MAX_STATEMENTS_PER_TXN = 10000;
public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
+ /**
+ * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
+ * a "delta" dir written by a real Acid write - cannot have any copies
+ */
public static final PathFilter originalBucketFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -113,6 +120,11 @@ public class AcidUtils {
private static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
+ /**
+ * @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD
+ */
+ private static final Pattern ORIGINAL_PATTERN_COPY =
+ Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+");
public static final PathFilter hiddenFileFilter = new PathFilter(){
@Override
@@ -243,7 +255,21 @@ public class AcidUtils {
.maximumTransactionId(0)
.bucket(bucket)
.writingBase(true);
- } else if (filename.startsWith(BUCKET_PREFIX)) {
+ }
+ else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
+ //todo: define groups in regex and use parseInt(Matcher.group(2))....
+ int bucket =
+ Integer.parseInt(filename.substring(0, filename.indexOf('_')));
+ int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1));
+ result
+ .setOldStyle(true)
+ .minimumTransactionId(0)
+ .maximumTransactionId(0)
+ .bucket(bucket)
+ .copyNumber(copyNumber)
+ .writingBase(true);
+ }
+ else if (filename.startsWith(BUCKET_PREFIX)) {
int bucket =
Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
@@ -482,7 +508,7 @@ public class AcidUtils {
Path getBaseDirectory();
/**
- * Get the list of original files. Not {@code null}.
+ * Get the list of original files. Not {@code null}. Must be sorted.
* @return the list of original files (eg. 000000_0)
*/
List<HdfsFileStatusWithId> getOriginalFiles();
@@ -825,7 +851,7 @@ public class AcidUtils {
// Okay, we're going to need these originals. Recurse through them and figure out what we
// really need.
for (FileStatus origDir : originalDirectories) {
- findOriginals(fs, origDir, original, useFileIds);
+ findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles);
}
}
@@ -893,7 +919,17 @@ public class AcidUtils {
final Path base = bestBase.status == null ? null : bestBase.status.getPath();
LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
-
+ /**
+ * If this sort order is changed and there are tables that have been converted to transactional
+ * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it
+ * may result in data loss since it may change how
+ * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns
+ * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen).
+ */
+ Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> {
+ //this does "Path.uri.compareTo(that.uri)"
+ return o1.getFileStatus().compareTo(o2.getFileStatus());
+ });
return new Directory(){
@Override
@@ -1011,7 +1047,7 @@ public class AcidUtils {
* @throws IOException
*/
private static void findOriginals(FileSystem fs, FileStatus stat,
- List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds) throws IOException {
+ List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
assert stat.isDir();
List<HdfsFileStatusWithId> childrenWithId = null;
Boolean val = useFileIds.value;
@@ -1031,18 +1067,22 @@ public class AcidUtils {
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
if (child.getFileStatus().isDir()) {
- findOriginals(fs, child.getFileStatus(), original, useFileIds);
+ findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles);
} else {
- original.add(child);
+ if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) {
+ original.add(child);
+ }
}
}
} else {
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter);
for (FileStatus child : children) {
if (child.isDir()) {
- findOriginals(fs, child, original, useFileIds);
+ findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles);
} else {
- original.add(createOriginalObj(null, child));
+ if(!ignoreEmptyFiles || child.getLen() > 0) {
+ original.add(createOriginalObj(null, child));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0ef7c75..f9e17a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1677,7 +1677,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// independent split strategies for them. There is a global flag 'isOriginal' that is set
// on a per split strategy basis and it has to be same for all the files in that strategy.
List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
- adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+ adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
allowSyntheticFileIds);
for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1927,7 +1927,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
root = path.getParent().getParent();
}
- } else {
+ } else {//here path is a delta/ but above it's a partition/
root = path;
}
@@ -1947,7 +1947,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final Configuration conf = options.getConfiguration();
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
- final int bucket = OrcInputFormat.getBucketForSplit(conf, split);
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+ mergerOptions.rootPath(root);
+ final int bucket;
+ if (split.hasBase()) {
+ AcidOutputFormat.Options acidIOOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf);
+ if(acidIOOptions.getBucket() < 0) {
+ LOG.warn("Can't determine bucket ID for " + split.getPath() + "; ignoring");
+ }
+ bucket = acidIOOptions.getBucket();
+ if(split.isOriginal()) {
+ mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath());
+ }
+ } else {
+ bucket = (int) split.getStart();
+ }
+
final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
readOptions.range(split.getStart(), split.getLength());
@@ -1956,7 +1972,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
new ValidReadTxnList(txnString);
final OrcRawRecordMerger records =
new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
- validTxnList, readOptions, deltas);
+ validTxnList, readOptions, deltas, mergerOptions);
return new RowReader<OrcStruct>() {
OrcStruct innerRecord = records.createValue();
@@ -2008,20 +2024,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
};
}
-
- static Path findOriginalBucket(FileSystem fs,
+ private static Path findOriginalBucket(FileSystem fs,
Path directory,
int bucket) throws IOException {
for(FileStatus stat: fs.listStatus(directory)) {
- String name = stat.getPath().getName();
- String numberPart = name.substring(0, name.indexOf('_'));
- if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) &&
- Integer.parseInt(numberPart) == bucket) {
+ if(stat.getLen() <= 0) {
+ continue;
+ }
+ AcidOutputFormat.Options bucketInfo =
+ AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
+ if(bucketInfo.getBucket() == bucket) {
return stat.getPath();
}
}
- throw new IllegalArgumentException("Can't find bucket " + bucket + " in " +
- directory);
+ throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
}
static Reader.Options createOptionsForReader(Configuration conf) {
@@ -2054,14 +2070,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return reader;
}
- static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) {
- if (orcSplit.hasBase()) {
- return AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
- } else {
- return (int) orcSplit.getStart();
- }
- }
-
public static boolean[] pickStripesViaTranslatedSarg(SearchArgument sarg,
OrcFile.WriterVersion writerVersion, List<OrcProto.Type> types,
List<StripeStatistics> stripeStats, int stripeCount) {
@@ -2134,7 +2142,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@VisibleForTesting
static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx combinedCtx, Context context,
- FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+ FileSystem fs, Path dir,
List<AcidBaseFileInfo> baseFiles,
List<ParsedDelta> parsedDeltas,
List<OrcProto.Type> readerTypes,
@@ -2145,7 +2153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// When no baseFiles, we will just generate a single split strategy and return.
List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
if (baseFiles.isEmpty()) {
- splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
if (splitStrategy != null) {
splitStrategies.add(splitStrategy);
@@ -2165,7 +2173,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Generate split strategy for non-acid schema original files, if any.
if (!originalSchemaFiles.isEmpty()) {
- splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
if (splitStrategy != null) {
splitStrategies.add(splitStrategy);
@@ -2174,7 +2182,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Generate split strategy for acid schema files, if any.
if (!acidSchemaFiles.isEmpty()) {
- splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir,
acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
if (splitStrategy != null) {
splitStrategies.add(splitStrategy);
@@ -2186,7 +2194,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@VisibleForTesting
static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
- FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+ FileSystem fs, Path dir,
List<HdfsFileStatusWithId> baseFiles,
boolean isOriginal,
List<ParsedDelta> parsedDeltas,
@@ -2261,8 +2269,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
}
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
+ .isCompacting(true)
+ .rootPath(baseDirectory);
return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
- bucket, validTxnList, new Reader.Options(), deltaDirectory);
+ bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95b8806..ffcdf6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.orc.OrcUtils;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
@@ -59,11 +62,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final long length;
private final ValidTxnList validTxnList;
private final int columns;
- private ReaderKey prevKey = new ReaderKey();
+ private final ReaderKey prevKey = new ReaderKey();
// this is the key less than the lowest key we need to process
- private RecordIdentifier minKey;
+ private final RecordIdentifier minKey;
// this is the last key we need to process
- private RecordIdentifier maxKey;
+ private final RecordIdentifier maxKey;
// an extra value so that we can return it while reading ahead
private OrcStruct extraValue;
@@ -156,7 +159,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
}
- public long getCurrentTransactionId() {
+ long getCurrentTransactionId() {
return currentTransactionId;
}
@@ -165,7 +168,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @param other the value to compare to
* @return -1, 0, +1
*/
- public int compareRow(RecordIdentifier other) {
+ int compareRow(RecordIdentifier other) {
return compareToInternal(other);
}
@@ -188,9 +191,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
final Reader reader;
final RecordReader recordReader;
final ReaderKey key;
- final RecordIdentifier maxKey;
+ private final RecordIdentifier minKey;
+ private final RecordIdentifier maxKey;
final int bucket;
private final int statementId;
+ boolean advancedToMinKey = false;
/**
* Create a reader that reads from the first key larger than minKey to any
@@ -210,21 +215,33 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderImpl.Options options, int statementId) throws IOException {
this.reader = reader;
this.key = key;
+ this.minKey = minKey;
this.maxKey = maxKey;
this.bucket = bucket;
// TODO use stripe statistics to jump over stripes
recordReader = reader.rowsOptions(options);
this.statementId = statementId;
+ }
+ RecordReader getRecordReader() {
+ return recordReader;
+ }
+ /**
+ * This must be called right after the constructor but not in the constructor to make sure
+ * sub-classes are fully initialized before their {@link #next(OrcStruct)} is called
+ */
+ void advnaceToMinKey() throws IOException {
+ advancedToMinKey = true;
// advance the reader until we reach the minimum key
do {
next(nextRecord);
} while (nextRecord != null &&
- (minKey != null && key.compareRow(minKey) <= 0));
+ (getMinKey() != null && key.compareRow(getMinKey()) <= 0));
}
void next(OrcStruct next) throws IOException {
- if (recordReader.hasNext()) {
- nextRecord = (OrcStruct) recordReader.next(next);
+ assert advancedToMinKey : "advnaceToMinKey() was not called";
+ if (getRecordReader().hasNext()) {
+ nextRecord = (OrcStruct) getRecordReader().next(next);
// set the key
key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
OrcRecordUpdater.getBucket(nextRecord),
@@ -233,10 +250,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
statementId);
// if this record is larger than maxKey, we need to stop
- if (maxKey != null && key.compareRow(maxKey) > 0) {
- LOG.debug("key " + key + " > maxkey " + maxKey);
+ if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
+ LOG.debug("key " + key + " > maxkey " + getMaxKey());
nextRecord = null;
- recordReader.close();
+ getRecordReader().close();
}
} else {
nextRecord = null;
@@ -244,6 +261,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
+ RecordIdentifier getMinKey() {
+ return minKey;
+ }
+ RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
int getColumns() {
return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
}
@@ -253,18 +276,192 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* A reader that pretends an original base file is a new version base file.
* It wraps the underlying reader's row with an ACID event object and
* makes the relevant translations.
+ *
+ * Running multiple Insert statements on the same partition (of non acid table) creates files
+ * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc. So the OriginalReaderPair must treat all
+ * of these files as part of a single logical bucket file.
+ *
+ * For Compaction, where each split includes the whole bucket, this means reading over all the
+ * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+ *
+ * For a read after the table is marked transactional but before it's rewritten into a base/
+ * by compaction, each of the original files may be split into many pieces. For each split we
+ * must make sure to include only the relevant part of each delta file.
+ * {@link OrcRawRecordMerger#minKey} and {@link OrcRawRecordMerger#maxKey} are computed for each
+ * split of the original file and used to filter rows from all the deltas. The ROW__ID.rowid for
+ * the rows of the 'original' file of course, must be assigned from the beginning of logical
+ * bucket.
*/
static final class OriginalReaderPair extends ReaderPair {
+ private final Options mergerOptions;
+ /**
+ * Sum total of all rows in all the files before the 'current' one in {@link #originalFiles} list
+ */
+ private long rowIdOffset = 0;
+ /**
+ * See {@link AcidUtils.Directory#getOriginalFiles()}. This list has a fixed sort order. This
+ * is the full list when compacting and empty when doing a simple read. The later is because we
+ * only need to read the current split from 1 file for simple read.
+ */
+ private final List<HadoopShims.HdfsFileStatusWithId> originalFiles;
+ /**
+ * index into {@link #originalFiles}
+ */
+ private int nextFileIndex = 0;
+ private long numRowsInCurrentFile = 0;
+ private RecordReader originalFileRecordReader = null;
+ private final Configuration conf;
+ private final Reader.Options options;
+ private final RecordIdentifier minKey;//shadow parent minKey to make final
+ private final RecordIdentifier maxKey;//shadow parent maxKey to make final
+
OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
- RecordIdentifier minKey, RecordIdentifier maxKey,
- Reader.Options options) throws IOException {
+ final RecordIdentifier minKey, final RecordIdentifier maxKey,
+ Reader.Options options, Options mergerOptions, Configuration conf,
+ ValidTxnList validTxnList) throws IOException {
super(key, reader, bucket, minKey, maxKey, options, 0);
+ this.mergerOptions = mergerOptions;
+ this.conf = conf;
+ this.options = options;
+ assert mergerOptions.getRootPath() != null : "Since we have original files";
+ assert bucket >= 0 : "don't support non-bucketed tables yet";
+
+ RecordIdentifier newMinKey = minKey;
+ RecordIdentifier newMaxKey = maxKey;
+ if(mergerOptions.isCompacting()) {
+ {
+ //when compacting each split needs to process the whole logical bucket
+ assert options.getOffset() == 0;
+ assert options.getMaxOffset() == Long.MAX_VALUE;
+ assert minKey == null;
+ assert maxKey == null;
+ }
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(
+ mergerOptions.getRootPath(), conf, validTxnList, false, true);
+ originalFiles = directoryState.getOriginalFiles();
+ assert originalFiles.size() > 0;
+ /**
+ * when there are no copyN files, the {@link #recordReader} will be the the one and only
+ * file for for 'bucket' but closing here makes flow cleaner and only happens once in the
+ * life of the table. With copyN files, the caller may pass in any one of the copyN files.
+ * This is less prone to bugs than expecting the reader to pass in a Reader for the 1st file
+ * of a logical bucket.*/
+ recordReader.close();
+ reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+ if(reader == null) {
+ //Compactor generated a split for a bucket that has no data?
+ throw new IllegalStateException("No 'original' files found for bucketId=" + bucket +
+ " in " + mergerOptions.getRootPath());
+ }
+ numRowsInCurrentFile = reader.getNumberOfRows();
+ originalFileRecordReader = reader.rowsOptions(options);
+ }
+ else {
+ /**
+ * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc We don't
+ * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+ * It's needed to correctly set maxKey. In particular, set maxKey==null if this split
+ * is the tail of the last file for this logical bucket to include all deltas written after
+ * non-acid to acid table conversion.
+ */
+ boolean isLastFileForThisBucket = false;
+ boolean haveSeenCurrentFile = false;
+ originalFiles = Collections.emptyList();
+ if (mergerOptions.getCopyIndex() > 0) {
+ //the split is from something other than the 1st file of the logical bucket - compute offset
+
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+ conf, validTxnList, false, true);
+ for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+ AcidOutputFormat.Options bucketOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucket() != bucket) {
+ continue;
+ }
+ if(haveSeenCurrentFile) {
+ //if here we already saw current file and now found another file for the same bucket
+ //so the current file is not the last file of the logical bucket
+ isLastFileForThisBucket = false;
+ break;
+ }
+ if(f.getFileStatus().getPath().equals(mergerOptions.getBucketPath())) {
+ /**
+ * found the file whence the current split is from so we're done
+ * counting {@link rowIdOffset}
+ */
+ haveSeenCurrentFile = true;
+ isLastFileForThisBucket = true;
+ continue;
+ }
+ Reader copyReader = OrcFile.createReader(f.getFileStatus().getPath(),
+ OrcFile.readerOptions(conf));
+ rowIdOffset += copyReader.getNumberOfRows();
+ }
+ if (rowIdOffset > 0) {
+ //rowIdOffset could be 0 if all files before current one are empty
+ /**
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
+ * int, Reader.Options)} need to fix min/max key since these are used by
+ * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
+ * the key. Clear? */
+ if (minKey != null) {
+ minKey.setRowId(minKey.getRowId() + rowIdOffset);
+ }
+ else {
+ /**
+ * If this is not the 1st file, set minKey 1 less than the start of current file
+ * (Would not need to set minKey if we knew that there are no delta files)
+ * {@link #advanceToMinKey()} needs this */
+ newMinKey = new RecordIdentifier(0, bucket, rowIdOffset - 1);
+ }
+ if (maxKey != null) {
+ maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
+ }
+ }
+ } else {
+ isLastFileForThisBucket = true;
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
+ conf, validTxnList, false, true);
+ int numFilesInBucket= 0;
+ for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
+ AcidOutputFormat.Options bucketOptions =
+ AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucket() == bucket) {
+ numFilesInBucket++;
+ if(numFilesInBucket > 1) {
+ isLastFileForThisBucket = false;
+ break;
+ }
+ }
+ }
+ }
+ originalFileRecordReader = recordReader;
+ if(!isLastFileForThisBucket && maxKey == null) {
+ /*
+ * If this is the last file for this bucket, maxKey == null means the split is the tail
+ * of the file so we want to leave it blank to make sure any insert events in delta
+ * files are included; Conversely, if it's not the last file, set the maxKey so that
+ * events from deltas that don't modify anything in the current split are excluded*/
+ newMaxKey = new RecordIdentifier(0, bucket,
+ rowIdOffset + reader.getNumberOfRows() - 1);
+ }
+ }
+ this.minKey = newMinKey;
+ this.maxKey = newMaxKey;
}
-
- @Override
- void next(OrcStruct next) throws IOException {
- if (recordReader.hasNext()) {
- long nextRowId = recordReader.getRowNumber();
+ @Override RecordReader getRecordReader() {
+ return originalFileRecordReader;
+ }
+ @Override RecordIdentifier getMinKey() {
+ return minKey;
+ }
+ @Override RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
+ private boolean nextFromCurrentFile(OrcStruct next) throws IOException {
+ if (originalFileRecordReader.hasNext()) {
+ //RecordReader.getRowNumber() produces a file-global row number even with PPD
+ long nextRowId = originalFileRecordReader.getRowNumber() + rowIdOffset;
// have to do initialization here, because the super's constructor
// calls next and thus we need to initialize before our constructor
// runs
@@ -282,7 +479,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
new LongWritable(nextRowId));
nextRecord.setFieldValue(OrcRecordUpdater.ROW,
- recordReader.next(null));
+ originalFileRecordReader.next(null));
} else {
nextRecord = next;
((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
@@ -296,20 +493,62 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
.set(nextRowId);
nextRecord.setFieldValue(OrcRecordUpdater.ROW,
- recordReader.next(OrcRecordUpdater.getRow(next)));
+ originalFileRecordReader.next(OrcRecordUpdater.getRow(next)));
}
key.setValues(0L, bucket, nextRowId, 0L, 0);
if (maxKey != null && key.compareRow(maxKey) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + maxKey);
}
- nextRecord = null;
- recordReader.close();
+ return false;//reached End Of Split
}
- } else {
- nextRecord = null;
- recordReader.close();
+ return true;
}
+ return false;//reached EndOfFile
+ }
+ @Override
+ void next(OrcStruct next) throws IOException {
+ assert advancedToMinKey : "advnaceToMinKey() was not called";
+ while(true) {
+ if(nextFromCurrentFile(next)) {
+ return;
+ } else {
+ if (originalFiles.size() <= nextFileIndex) {
+ //no more original files to read
+ nextRecord = null;
+ originalFileRecordReader.close();
+ return;
+ } else {
+ assert mergerOptions.isCompacting() : "originalFiles.size() should be 0 when not compacting";
+ rowIdOffset += numRowsInCurrentFile;
+ originalFileRecordReader.close();
+ Reader reader = advanceToNextFile();
+ if(reader == null) {
+ nextRecord = null;
+ return;
+ }
+ numRowsInCurrentFile = reader.getNumberOfRows();
+ originalFileRecordReader = reader.rowsOptions(options);
+ }
+ }
+ }
+ }
+ /**
+ * Finds the next file of the logical bucket
+ * @return {@code null} if there are no more files
+ */
+ private Reader advanceToNextFile() throws IOException {
+ while(nextFileIndex < originalFiles.size()) {
+ AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
+ if (bucketOptions.getBucket() == bucket) {
+ break;
+ }
+ nextFileIndex++;
+ }
+ if(originalFiles.size() <= nextFileIndex) {
+ return null;//no more files for current bucket
+ }
+ return OrcFile.createReader(originalFiles.get(nextFileIndex++).getFileStatus().getPath(), OrcFile.readerOptions(conf));
}
@Override
@@ -318,6 +557,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
+ /**
+ * The process here reads several (base + some deltas) files each of which is sorted on
+ * {@link ReaderKey} ascending. The output of this Reader should a global order across these
+ * files. The root of this tree is always the next 'file' to read from.
+ */
private final TreeMap<ReaderKey, ReaderPair> readers =
new TreeMap<ReaderKey, ReaderPair>();
@@ -327,6 +571,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
// The key of the next lowest reader.
private ReaderKey secondaryKey = null;
+ private static final class KeyInterval {
+ private final RecordIdentifier minKey;
+ private final RecordIdentifier maxKey;
+ private KeyInterval(RecordIdentifier minKey, RecordIdentifier maxKey) {
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ }
+ private RecordIdentifier getMinKey() {
+ return minKey;
+ }
+ private RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
+ }
/**
* Find the key range for original bucket files.
* @param reader the reader
@@ -334,14 +592,24 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @param options the options for reading with
* @throws IOException
*/
- private void discoverOriginalKeyBounds(Reader reader, int bucket,
+ private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
Reader.Options options
) throws IOException {
long rowLength = 0;
long rowOffset = 0;
- long offset = options.getOffset();
- long maxOffset = options.getMaxOffset();
+ long offset = options.getOffset();//this would usually be at block boundary
+ long maxOffset = options.getMaxOffset();//this would usually be at block boundary
boolean isTail = true;
+ RecordIdentifier minKey = null;
+ RecordIdentifier maxKey = null;
+ /**
+ * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
+ * necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st
+ * row of the first stripe that starts after getOffset() and maxKey to be the last row of the
+ * stripe that contains getMaxOffset(). This breaks if getOffset() and getMaxOffset() are inside
+ * the sames tripe - in this case we have minKey & isTail=false but rowLength is never set.
+ * (HIVE-16953)
+ */
for(StripeInformation stripe: reader.getStripes()) {
if (offset > stripe.getOffset()) {
rowOffset += stripe.getNumberOfRows();
@@ -358,6 +626,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (!isTail) {
maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
}
+ return new KeyInterval(minKey, maxKey);
}
/**
@@ -366,7 +635,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @param options the options for reading with
* @throws IOException
*/
- private void discoverKeyBounds(Reader reader,
+ private KeyInterval discoverKeyBounds(Reader reader,
Reader.Options options) throws IOException {
RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
long offset = options.getOffset();
@@ -374,6 +643,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
int firstStripe = 0;
int stripeCount = 0;
boolean isTail = true;
+ RecordIdentifier minKey = null;
+ RecordIdentifier maxKey = null;
+
List<StripeInformation> stripes = reader.getStripes();
for(StripeInformation stripe: stripes) {
if (offset > stripe.getOffset()) {
@@ -391,6 +663,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (!isTail) {
maxKey = keyIndex[firstStripe + stripeCount - 1];
}
+ return new KeyInterval(minKey, maxKey);
}
/**
@@ -416,6 +689,49 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return result;
}
+ static class Options {
+ private int copyIndex = 0;
+ private boolean isCompacting = false;
+ private Path bucketPath;
+ private Path rootPath;
+ Options copyIndex(int copyIndex) {
+ assert copyIndex >= 0;
+ this.copyIndex = copyIndex;
+ return this;
+ }
+ Options isCompacting(boolean isCompacting) {
+ this.isCompacting = isCompacting;
+ return this;
+ }
+ Options bucketPath(Path bucketPath) {
+ this.bucketPath = bucketPath;
+ return this;
+ }
+ Options rootPath(Path rootPath) {
+ this.rootPath = rootPath;
+ return this;
+ }
+ /**
+ * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
+ */
+ int getCopyIndex() {
+ return copyIndex;
+ }
+ boolean isCompacting() {
+ return isCompacting;
+ }
+ /**
+ * Full path to the data file
+ * @return
+ */
+ Path getBucketPath() {
+ return bucketPath;
+ }
+ /**
+ * Partition folder (Table folder if not partitioned)
+ */
+ Path getRootPath() { return rootPath; }
+ }
/**
* Create a reader that merge sorts the ACID events together.
* @param conf the configuration
@@ -433,7 +749,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
int bucket,
ValidTxnList validTxnList,
Reader.Options options,
- Path[] deltaDirectory) throws IOException {
+ Path[] deltaDirectory, Options mergerOptions) throws IOException {
this.conf = conf;
this.collapse = collapseEvents;
this.offset = options.getOffset();
@@ -450,32 +766,36 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
Reader.Options eventOptions = createEventOptions(options);
if (reader == null) {
baseReader = null;
+ minKey = maxKey = null;
} else {
-
- // find the min/max based on the offset and length
+ KeyInterval keyInterval;
+ // find the min/max based on the offset and length (and more for 'original')
if (isOriginal) {
- discoverOriginalKeyBounds(reader, bucket, options);
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
} else {
- discoverKeyBounds(reader, options);
+ keyInterval = discoverKeyBounds(reader, options);
}
- LOG.info("min key = " + minKey + ", max key = " + maxKey);
+ LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
ReaderPair pair;
ReaderKey key = new ReaderKey();
if (isOriginal) {
options = options.clone();
- pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
- options);
+ pair = new OriginalReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ options, mergerOptions, conf, validTxnList);
} else {
- pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
+ pair = new ReaderPair(key, reader, bucket, keyInterval.getMinKey(), keyInterval.getMaxKey(),
eventOptions, 0);
}
-
+ minKey = pair.getMinKey();
+ maxKey = pair.getMaxKey();
+ LOG.info("updated min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
+ pair.advnaceToMinKey();
// if there is at least one record, put it in the map
if (pair.nextRecord != null) {
readers.put(key, pair);
}
- baseReader = pair.recordReader;
+ baseReader = pair.getRecordReader();
}
// we always want to read all of the deltas
@@ -504,6 +824,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
ReaderPair deltaPair;
deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+ deltaPair.advnaceToMinKey();
if (deltaPair.nextRecord != null) {
readers.put(key, deltaPair);
}
@@ -648,6 +969,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public float getProgress() throws IOException {
+ //this is not likely to do the right thing for Compaction of "original" files when there are copyN files
return baseReader == null ? 1 : baseReader.getProgress();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 75c7680..29f5a8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -42,9 +41,6 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
import org.apache.orc.impl.AcidStats;
import org.apache.orc.impl.OrcAcidUtils;
import org.slf4j.Logger;
@@ -360,8 +356,11 @@ public class VectorizedOrcAcidRowBatchReader
int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
+ assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
- validTxnList, readerOptions, deleteDeltas);
+ validTxnList, readerOptions, deleteDeltas,
+ mergerOptions);
this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
this.deleteRecordValue = this.deleteRecords.createValue();
// Initialize the first value in the delete reader.
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 73710a7..7019f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3129,7 +3129,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
* I'll leave the below loop for now until a better approach is found.
*/
for (int counter = 1; destFs.exists(destFilePath); counter++) {
- destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
+ destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + (!type.isEmpty() ? "." + type : ""));
}
if (isRenameAllowed) {
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index f83b6db..bafed9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -251,6 +251,7 @@ public class CompactorMR {
// There are original format files
for (HdfsFileStatusWithId stat : originalFiles) {
Path path = stat.getFileStatus().getPath();
+ //note that originalFiles are all original files recursively not dirs
dirsToSearch.add(path);
LOG.debug("Adding original file " + path + " to dirs to search");
}
@@ -275,6 +276,12 @@ public class CompactorMR {
su.gatherStats();
}
+
+ /**
+ * @param baseDir if not null, it's either table/partition root folder or base_xxxx.
+ * If it's base_xxxx, it's in dirsToSearch, else the actual original files
+ * (all leaves recursively) are in the dirsToSearch list
+ */
private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
StringableList dirsToSearch,
List<AcidUtils.ParsedDelta> parsedDeltas,
@@ -363,7 +370,7 @@ public class CompactorMR {
* @param hadoopConf
* @param bucket bucket to be processed by this split
* @param files actual files this split should process. It is assumed the caller has already
- * parsed out the files in base and deltas to populate this list.
+ * parsed out the files in base and deltas to populate this list. Includes copy_N
* @param base directory of the base, or the partition/table location if the files are in old
* style. Can be null.
* @param deltas directories of the delta files.
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 7c66955..5fb89d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -105,6 +105,7 @@ public class TestTxnCommands {
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -880,4 +881,55 @@ public class TestTxnCommands {
runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)");
runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))");
}
+ /**
+ * HIVE-16177
+ * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}
+ */
+ @Test
+ public void testNonAcidToAcidConversion01() throws Exception {
+ //create 1 row in a file 000001_0 (and an empty 000000_0)
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+ //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)");
+
+ //convert the table to Acid
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+ //create a delta directory
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)");
+
+ //make sure we assign correct Ids
+ List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID");
+ LOG.warn("before compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("", 4, rs.size());
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+ //run Compaction
+ runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID");
+ LOG.warn("after compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("", 4, rs.size());
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+ Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
+
+ //make sure they are the same before and after compaction
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5786c4f..31921f1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -319,6 +319,68 @@ public class TestTxnCommands2 {
resultData = new int[][] {{3,8}, {5,6}, {9,20}};
Assert.assertEquals(stringifyValues(resultData), rs);
}
+ /**
+ * see HIVE-16177
+ * See also {@link TestTxnCommands#testNonAcidToAcidConversion01()}
+ */
+ @Test
+ public void testNonAcidToAcidConversion02() throws Exception {
+ //create 2 rows in a file 000001_0 (and an empty 000000_0)
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2),(1,3)");
+ //create 2 rows in a file 000000_0_copy1 and 2 rows in a file 000001_0_copy1
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(0,13),(1,4),(1,5)");
+ //create 1 row in a file 000001_0_copy2 (and empty 000000_0_copy2?)
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,6)");
+
+ //convert the table to Acid
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
+ List<String> rs1 = runStatementOnDriver("describe "+ Table.NONACIDORCTBL);
+ //create a some of delta directories
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,15),(1,16)");
+ runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b = 120 where a = 0 and b = 12");
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,17)");
+ runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1 and b = 3");
+
+ List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
+ LOG.warn("before compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ /*
+ * All ROW__IDs are unique on read after conversion to acid
+ * ROW__IDs are exactly the same before and after compaction
+ * Also check the file name after compaction for completeness
+ */
+ String[][] expected = {
+ {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"},
+ {"{\"transactionid\":18,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"},
+ {"{\"transactionid\":20,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6", "bucket_00001"},
+ {"{\"transactionid\":18,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"}
+ };
+ Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ }
+ //run Compaction
+ runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
+ LOG.warn("after compact");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals("Unexpected row count after compaction", expected.length, rs.size());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ Assert.assertTrue("Actual line(bucket) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ //make sure they are the same before and after compaction
+ }
/**
* Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index a7ff9a3..c928732 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
@@ -142,6 +143,10 @@ public class TestAcidUtils {
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/tbl/part1/000000_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1",
+ 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2",
+ 500, new byte[0]),
new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]),
new MockFile("mock:/tbl/part1/000002_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/random", 500, new byte[0]),
@@ -154,13 +159,17 @@ public class TestAcidUtils {
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
- assertEquals(5, result.size());
+ assertEquals(7, result.size());
assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
- assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
- assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "1",
+ result.get(1).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000000_0" + Utilities.COPY_KEYWORD + "2",
+ result.get(2).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000001_1", result.get(3).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000002_0", result.get(4).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/random", result.get(5).getFileStatus().getPath().toString());
assertEquals("mock:/tbl/part1/subdir/000000_0",
- result.get(4).getFileStatus().getPath().toString());
+ result.get(6).getFileStatus().getPath().toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index bb79857..43ed238 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -910,7 +910,7 @@ public class TestInputOutputFormat {
MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
return OrcInputFormat.determineSplitStrategies(combineCtx, context,
- adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
null, null, true);
}
@@ -924,7 +924,7 @@ public class TestInputOutputFormat {
OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
OrcInputFormat.AcidDirInfo adi = gen.call();
return OrcInputFormat.determineSplitStrategies(
- null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
null, null, true);
}
@@ -3397,7 +3397,9 @@ public class TestInputOutputFormat {
// call-2: open to read data - split 1 => mock:/mocktable5/0_0
// call-3: open to read footer - split 2 => mock:/mocktable5/0_1
// call-4: open to read data - split 2 => mock:/mocktable5/0_1
- assertEquals(4, readOpsDelta);
+ // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_0
+ // call-6: AcidUtils.getAcidState - getLen() mock:/mocktable5/0_1
+ assertEquals(6, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3470,7 +3472,9 @@ public class TestInputOutputFormat {
}
// call-1: open to read data - split 1 => mock:/mocktable6/0_0
// call-2: open to read data - split 2 => mock:/mocktable6/0_1
- assertEquals(2, readOpsDelta);
+ // call-3: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_0
+ // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable6/0_1
+ assertEquals(4, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3548,7 +3552,8 @@ public class TestInputOutputFormat {
// call-2: open to read data - split 1 => mock:/mocktable7/0_0
// call-3: open side file (flush length) of delta directory
// call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file
- assertEquals(4, readOpsDelta);
+ // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0
+ assertEquals(5, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3625,7 +3630,8 @@ public class TestInputOutputFormat {
// call-1: open to read data - split 1 => mock:/mocktable8/0_0
// call-2: open side file (flush length) of delta directory
// call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file
- assertEquals(3, readOpsDelta);
+ // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0
+ assertEquals(4, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 1ce1bfb..584bd3b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -189,6 +189,7 @@ public class TestOrcRawRecordMerger {
RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
new Reader.Options(), 0);
+ pair.advnaceToMinKey();
RecordReader recordReader = pair.recordReader;
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketId());
@@ -215,6 +216,7 @@ public class TestOrcRawRecordMerger {
ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
new Reader.Options(), 0);
+ pair.advnaceToMinKey();
RecordReader recordReader = pair.recordReader;
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketId());
@@ -290,8 +292,14 @@ public class TestOrcRawRecordMerger {
RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
boolean[] includes = new boolean[]{true, true};
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(tmpDir, "testOriginalReaderPair");
+ fs.makeQualified(root);
+ fs.create(root);
ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
- new Reader.Options().include(includes));
+ new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+ pair.advnaceToMinKey();
RecordReader recordReader = pair.recordReader;
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketId());
@@ -319,8 +327,14 @@ public class TestOrcRawRecordMerger {
public void testOriginalReaderPairNoMin() throws Exception {
ReaderKey key = new ReaderKey();
Reader reader = createMockOriginalReader();
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
+ fs.makeQualified(root);
+ fs.create(root);
ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
- new Reader.Options());
+ new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+ pair.advnaceToMinKey();
assertEquals("first", value(pair.nextRecord));
assertEquals(0, key.getTransactionId());
assertEquals(10, key.getBucketId());
@@ -423,7 +437,7 @@ public class TestOrcRawRecordMerger {
OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
false, 10, createMaximalTxnList(),
- new Reader.Options().range(1000, 1000), null);
+ new Reader.Options().range(1000, 1000), null, new OrcRawRecordMerger.Options());
RecordReader rr = merger.getCurrentReader().recordReader;
assertEquals(0, merger.getOtherReaders().size());
@@ -531,7 +545,7 @@ public class TestOrcRawRecordMerger {
OrcRawRecordMerger merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()));
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
RecordIdentifier key = merger.createKey();
OrcStruct value = merger.createValue();
assertEquals(false, merger.next(key, value));
@@ -606,7 +620,7 @@ public class TestOrcRawRecordMerger {
OrcRawRecordMerger merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()));
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
assertEquals(null, merger.getMinKey());
assertEquals(null, merger.getMaxKey());
RecordIdentifier id = merger.createKey();
@@ -681,7 +695,7 @@ public class TestOrcRawRecordMerger {
// make a merger that doesn't collapse events
merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()));
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
@@ -780,7 +794,7 @@ public class TestOrcRawRecordMerger {
merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
txns, new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()));
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
for(int i=0; i < values.length; ++i) {
assertEquals(true, merger.next(id, event));
LOG.info("id = " + id + "event = " + event);
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 6bf1312..73bc1ab 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -182,7 +182,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
OrcInputFormat.AcidDirInfo adi = gen.call();
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
- null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
null, null, true);
assertEquals(1, splitStrategies.size());
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_orig_table.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_orig_table.q b/ql/src/test/queries/clientpositive/insert_orig_table.q
index de408d9..544fe11 100644
--- a/ql/src/test/queries/clientpositive/insert_orig_table.q
+++ b/ql/src/test/queries/clientpositive/insert_orig_table.q
@@ -1,9 +1,23 @@
-set hive.strict.checks.bucketing=false;
-
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+drop table if exists acid_iot_stage;
+create table acid_iot_stage(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) stored as orc;
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage;
+
create table acid_iot(
ctinyint TINYINT,
csmallint SMALLINT,
@@ -18,7 +32,7 @@ create table acid_iot(
cboolean1 BOOLEAN,
cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot;
+insert into acid_iot select * from acid_iot_stage;
select count(*) from acid_iot;
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_values_orig_table.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table.q b/ql/src/test/queries/clientpositive/insert_values_orig_table.q
index db4fc82..1319a0a 100644
--- a/ql/src/test/queries/clientpositive/insert_values_orig_table.q
+++ b/ql/src/test/queries/clientpositive/insert_values_orig_table.q
@@ -1,9 +1,23 @@
-set hive.strict.checks.bucketing=false;
-
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+drop table if exists acid_ivot_stage;
+create table acid_ivot_stage(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) stored as orc;
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage;
+
create table acid_ivot(
ctinyint TINYINT,
csmallint SMALLINT,
@@ -18,7 +32,7 @@ create table acid_ivot(
cboolean1 BOOLEAN,
cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
select count(*) from acid_ivot;
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
index 49c5b0a..2f366fb 100644
--- a/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
+++ b/ql/src/test/queries/clientpositive/insert_values_orig_table_use_metadata.q
@@ -1,10 +1,25 @@
-set hive.strict.checks.bucketing=false;
-
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.compute.query.using.stats=true;
+drop table if exists acid_ivot_stage;
+create table acid_ivot_stage(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) stored as orc;
+
+LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot_stage;
+
create table acid_ivot(
ctinyint TINYINT,
csmallint SMALLINT,
@@ -21,7 +36,7 @@ create table acid_ivot(
desc formatted acid_ivot;
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
desc formatted acid_ivot;
@@ -71,7 +86,7 @@ explain select count(*) from acid_ivot;
select count(*) from acid_ivot;
-LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
+insert into acid_ivot select * from acid_ivot_stage;
desc formatted acid_ivot;
http://git-wip-us.apache.org/repos/asf/hive/blob/37f84ca0/ql/src/test/results/clientpositive/insert_orig_table.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/insert_orig_table.q.out b/ql/src/test/results/clientpositive/insert_orig_table.q.out
index 5eea74d..a62e0ec 100644
--- a/ql/src/test/results/clientpositive/insert_orig_table.q.out
+++ b/ql/src/test/results/clientpositive/insert_orig_table.q.out
@@ -1,3 +1,47 @@
+PREHOOK: query: drop table if exists acid_iot_stage
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists acid_iot_stage
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table acid_iot_stage(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_iot_stage
+POSTHOOK: query: create table acid_iot_stage(
+ ctinyint TINYINT,
+ csmallint SMALLINT,
+ cint INT,
+ cbigint BIGINT,
+ cfloat FLOAT,
+ cdouble DOUBLE,
+ cstring1 STRING,
+ cstring2 STRING,
+ ctimestamp1 TIMESTAMP,
+ ctimestamp2 TIMESTAMP,
+ cboolean1 BOOLEAN,
+ cboolean2 BOOLEAN) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_iot_stage
+PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@acid_iot_stage
+POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot_stage
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@acid_iot_stage
PREHOOK: query: create table acid_iot(
ctinyint TINYINT,
csmallint SMALLINT,
@@ -30,14 +74,26 @@ POSTHOOK: query: create table acid_iot(
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@acid_iot
-PREHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot
-PREHOOK: type: LOAD
-#### A masked pattern was here ####
+PREHOOK: query: insert into acid_iot select * from acid_iot_stage
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_iot_stage
PREHOOK: Output: default@acid_iot
-POSTHOOK: query: LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_iot
-POSTHOOK: type: LOAD
-#### A masked pattern was here ####
+POSTHOOK: query: insert into acid_iot select * from acid_iot_stage
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_iot_stage
POSTHOOK: Output: default@acid_iot
+POSTHOOK: Lineage: acid_iot.cbigint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cboolean1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cboolean2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cdouble SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cfloat SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: acid_iot.csmallint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cstring1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: acid_iot.cstring2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctimestamp1 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctimestamp2 SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: acid_iot.ctinyint SIMPLE [(acid_iot_stage)acid_iot_stage.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
PREHOOK: query: select count(*) from acid_iot
PREHOOK: type: QUERY
PREHOOK: Input: default@acid_iot