You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2015/07/20 22:12:18 UTC
[08/50] [abbrv] hive git commit: HIVE-11030 - Enhance storage layer
to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)
HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66feedc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66feedc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66feedc5
Branch: refs/heads/spark
Commit: 66feedc5569de959a383e0a58d9e8768bbad0e2c
Parents: 5c94bda
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Jul 13 09:11:28 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Jul 13 09:11:28 2015 -0700
----------------------------------------------------------------------
.../streaming/AbstractRecordWriter.java | 4 +-
.../streaming/mutate/worker/MutatorImpl.java | 4 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
.../hadoop/hive/ql/io/AcidInputFormat.java | 60 +++++++-
.../hadoop/hive/ql/io/AcidOutputFormat.java | 49 +++++-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++----
.../hadoop/hive/ql/io/HiveFileFormatUtils.java | 19 +--
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 20 +--
.../hadoop/hive/ql/io/orc/OrcNewSplit.java | 13 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 66 ++++++--
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +++++++
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 16 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 20 ++-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 4 +
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +-
.../hadoop/hive/ql/plan/FileSinkDesc.java | 27 +++-
.../hive/ql/txn/compactor/CompactorMR.java | 4 +-
.../hive/ql/exec/TestFileSinkOperator.java | 3 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 ++++++++-
.../hive/ql/io/orc/TestInputOutputFormat.java | 13 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 57 ++++---
.../hive/ql/io/orc/TestOrcRecordUpdater.java | 6 +-
.../hive/ql/txn/compactor/CompactorTest.java | 20 ++-
.../hive/ql/txn/compactor/TestCleaner.java | 8 +-
.../hive/ql/txn/compactor/TestCleaner2.java | 14 ++
.../hive/ql/txn/compactor/TestInitiator.java | 4 +
.../hive/ql/txn/compactor/TestWorker.java | 49 +++---
.../hive/ql/txn/compactor/TestWorker2.java | 16 ++
29 files changed, 645 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index ed46bca..c959222 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter {
.inspector(getSerde().getObjectInspector())
.bucket(bucketId)
.minimumTransactionId(minTxnId)
- .maximumTransactionId(maxTxnID));
+ .maximumTransactionId(maxTxnID)
+ .statementId(-1)
+ .finalDestination(partitionPath));
} catch (SerDeException e) {
throw new SerializationError("Failed to get object inspector from Serde "
+ getSerde().getClass().getName(), e);
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
index 0fe41d5..52062f8 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
@@ -78,7 +78,9 @@ public class MutatorImpl implements Mutator {
.bucket(bucketId)
.minimumTransactionId(transactionId)
.maximumTransactionId(transactionId)
- .recordIdColumn(recordIdColumn));
+ .recordIdColumn(recordIdColumn)
+ .finalDestination(partitionPath)
+ .statementId(-1));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 934cb42..b74e5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -988,6 +988,7 @@ public class Driver implements CommandProcessor {
if (acidSinks != null) {
for (FileSinkDesc desc : acidSinks) {
desc.setTransactionId(txnId);
+ desc.setStatementId(txnMgr.getStatementId());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index e1d2395..24506b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
/**
* The interface required for input formats that what to support ACID
@@ -62,7 +68,7 @@ import java.io.IOException;
* <li>New format -
* <pre>
* $partition/base_$tid/$bucket
- * delta_$tid_$tid/$bucket
+ * delta_$tid_$tid_$stid/$bucket
* </pre></li>
* </ul>
* <p>
@@ -71,6 +77,8 @@ import java.io.IOException;
* stored sorted by the original transaction id (ascending), bucket (ascending),
* row id (ascending), and current transaction id (descending). Thus the files
* can be merged by advancing through the files in parallel.
+ * The stid is unique id (within the transaction) of the statement that created
+ * this delta file.
* <p>
* The base files include all transactions from the beginning of time
* (transaction id 0) to the transaction in the directory name. Delta
@@ -91,7 +99,7 @@ import java.io.IOException;
* For row-at-a-time processing, KEY can conveniently pass RowId into the operator
* pipeline. For vectorized execution the KEY could perhaps represent a range in the batch.
* Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return
- * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined
+ * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined
* to provide access to the RowId. Other implementations of AcidInputFormat can use either
* mechanism.
* </p>
@@ -101,6 +109,54 @@ import java.io.IOException;
public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
extends InputFormat<KEY, VALUE>, InputFormatChecker {
+ static final class DeltaMetaData implements Writable {
+ private long minTxnId;
+ private long maxTxnId;
+ private List<Integer> stmtIds;
+
+ public DeltaMetaData() {
+ this(0,0,null);
+ }
+ DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.stmtIds = stmtIds;
+ }
+ long getMinTxnId() {
+ return minTxnId;
+ }
+ long getMaxTxnId() {
+ return maxTxnId;
+ }
+ List<Integer> getStmtIds() {
+ return stmtIds;
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(minTxnId);
+ out.writeLong(maxTxnId);
+ out.writeInt(stmtIds.size());
+ if(stmtIds == null) {
+ return;
+ }
+ for(Integer id : stmtIds) {
+ out.writeInt(id);
+ }
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ minTxnId = in.readLong();
+ maxTxnId = in.readLong();
+ int numStatements = in.readInt();
+ if(numStatements <= 0) {
+ return;
+ }
+ stmtIds = new ArrayList<>();
+ for(int i = 0; i < numStatements; i++) {
+ stmtIds.add(in.readInt());
+ }
+ }
+ }
/**
* Options for controlling the record readers.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 0d537e1..dd90a95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
/**
* Options to control how the files are written
*/
- public static class Options {
+ public static class Options implements Cloneable {
private final Configuration configuration;
private FileSystem fs;
private ObjectInspector inspector;
@@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
private PrintStream dummyStream = null;
private boolean oldStyle = false;
private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
-
+ //unique within a transaction
+ private int statementId = 0;
+ private Path finalDestination;
/**
* Create the options object.
* @param conf Use the given configuration
@@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
}
/**
+ * shallow clone
+ */
+ @Override
+ public Options clone() {
+ try {
+ return (Options)super.clone();
+ }
+ catch(CloneNotSupportedException ex) {
+ throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex);
+ }
+ }
+ /**
* Use the given ObjectInspector for each record written.
* @param inspector the inspector to use.
* @return this
@@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
return this;
}
+ /**
+ * @since 1.3.0
+ * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names.
+ * This is primarily needed for testing to make sure 1.3 code can still read files created
+ * by older code. Also used by Comactor.
+ */
+ public Options statementId(int id) {
+ if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
+ throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
+ }
+ if(id < -1) {
+ throw new IllegalArgumentException("Illegal statementId value: " + id);
+ }
+ this.statementId = id;
+ return this;
+ }
+ /**
+ * @param p where the data for this operation will eventually end up;
+ * basically table or partition directory in FS
+ */
+ public Options finalDestination(Path p) {
+ this.finalDestination = p;
+ return this;
+ }
+
public Configuration getConfiguration() {
return configuration;
}
@@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
boolean getOldStyle() {
return oldStyle;
}
+ public int getStatementId() {
+ return statementId;
+ }
+ public Path getFinalDestination() {
+ return finalDestination;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2214733..c7e0780 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -67,6 +67,15 @@ public class AcidUtils {
};
public static final String BUCKET_DIGITS = "%05d";
public static final String DELTA_DIGITS = "%07d";
+ /**
+ * 10K statements per tx. Probably overkill ... since that many delta files
+ * would not be good for performance
+ */
+ public static final String STATEMENT_DIGITS = "%04d";
+ /**
+ * This must be in sync with {@link #STATEMENT_DIGITS}
+ */
+ public static final int MAX_STATEMENTS_PER_TXN = 10000;
public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
public static final PathFilter originalBucketFilter = new PathFilter() {
@@ -79,7 +88,7 @@ public class AcidUtils {
private AcidUtils() {
// NOT USED
}
- private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
+ private static final Log LOG = LogFactory.getLog(AcidUtils.class);
private static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
@@ -104,12 +113,23 @@ public class AcidUtils {
BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
}
- private static String deltaSubdir(long min, long max) {
+ /**
+ * This is format of delta dir name prior to Hive 1.3.x
+ */
+ public static String deltaSubdir(long min, long max) {
return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
String.format(DELTA_DIGITS, max);
}
/**
+ * Each write statement in a transaction creates its own delta dir.
+ * @since 1.3.x
+ */
+ public static String deltaSubdir(long min, long max, int statementId) {
+ return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
+ }
+
+ /**
* Create a filename for a bucket file.
* @param directory the partition directory
* @param options the options for writing the bucket
@@ -124,9 +144,15 @@ public class AcidUtils {
} else if (options.isWritingBase()) {
subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
options.getMaximumTransactionId());
+ } else if(options.getStatementId() == -1) {
+ //when minor compaction runs, we collapse per statement delta files inside a single
+ //transaction so we no longer need a statementId in the file name
+ subdir = deltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId());
} else {
subdir = deltaSubdir(options.getMinimumTransactionId(),
- options.getMaximumTransactionId());
+ options.getMaximumTransactionId(),
+ options.getStatementId());
}
return createBucketFile(new Path(directory, subdir), options.getBucket());
}
@@ -214,14 +240,24 @@ public class AcidUtils {
}
public static class ParsedDelta implements Comparable<ParsedDelta> {
- final long minTransaction;
- final long maxTransaction;
- final FileStatus path;
+ private final long minTransaction;
+ private final long maxTransaction;
+ private final FileStatus path;
+ //-1 is for internal (getAcidState()) purposes and means the delta dir
+ //had no statement ID
+ private final int statementId;
+ /**
+ * for pre 1.3.x delta files
+ */
ParsedDelta(long min, long max, FileStatus path) {
+ this(min, max, path, -1);
+ }
+ ParsedDelta(long min, long max, FileStatus path, int statementId) {
this.minTransaction = min;
this.maxTransaction = max;
this.path = path;
+ this.statementId = statementId;
}
public long getMinTransaction() {
@@ -236,6 +272,16 @@ public class AcidUtils {
return path.getPath();
}
+ public int getStatementId() {
+ return statementId == -1 ? 0 : statementId;
+ }
+
+ /**
+ * Compactions (Major/Minor) merge deltas/bases but delete of old files
+ * happens in a different process; thus it's possible to have bases/deltas with
+ * overlapping txnId boundaries. The sort order helps figure out the "best" set of files
+ * to use to get data.
+ */
@Override
public int compareTo(ParsedDelta parsedDelta) {
if (minTransaction != parsedDelta.minTransaction) {
@@ -250,7 +296,22 @@ public class AcidUtils {
} else {
return -1;
}
- } else {
+ }
+ else if(statementId != parsedDelta.statementId) {
+ /**
+ * We want deltas after minor compaction (w/o statementId) to sort
+ * earlier so that getAcidState() considers compacted files (into larger ones) obsolete
+ * Before compaction, include deltas with all statementIds for a given txnId
+ * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
+ */
+ if(statementId < parsedDelta.statementId) {
+ return -1;
+ }
+ else {
+ return 1;
+ }
+ }
+ else {
return path.compareTo(parsedDelta.path);
}
}
@@ -271,46 +332,72 @@ public class AcidUtils {
/**
* Convert the list of deltas into an equivalent list of begin/end
- * transaction id pairs.
+ * transaction id pairs. Assumes {@code deltas} is sorted.
* @param deltas
* @return the list of transaction ids to serialize
*/
- public static List<Long> serializeDeltas(List<ParsedDelta> deltas) {
- List<Long> result = new ArrayList<Long>(deltas.size() * 2);
- for(ParsedDelta delta: deltas) {
- result.add(delta.minTransaction);
- result.add(delta.maxTransaction);
+ public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
+ List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
+ AcidInputFormat.DeltaMetaData last = null;
+ for(ParsedDelta parsedDelta : deltas) {
+ if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
+ last.getStmtIds().add(parsedDelta.getStatementId());
+ continue;
+ }
+ last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
+ result.add(last);
+ if(parsedDelta.statementId >= 0) {
+ last.getStmtIds().add(parsedDelta.getStatementId());
+ }
}
return result;
}
/**
* Convert the list of begin/end transaction id pairs to a list of delta
- * directories.
+ * directories. Note that there may be multiple delta files for the exact same txn range starting
+ * with 1.3.x;
+ * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
* @param root the root directory
* @param deltas list of begin/end transaction id pairs
* @return the list of delta paths
*/
- public static Path[] deserializeDeltas(Path root, List<Long> deltas) {
- int deltaSize = deltas.size() / 2;
- Path[] result = new Path[deltaSize];
- for(int i = 0; i < deltaSize; ++i) {
- result[i] = new Path(root, deltaSubdir(deltas.get(i * 2),
- deltas.get(i * 2 + 1)));
+ public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
+ List<Path> results = new ArrayList<Path>(deltas.size());
+ for(AcidInputFormat.DeltaMetaData dmd : deltas) {
+ if(dmd.getStmtIds().isEmpty()) {
+ results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+ continue;
+ }
+ for(Integer stmtId : dmd.getStmtIds()) {
+ results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+ }
}
- return result;
+ return results.toArray(new Path[results.size()]);
}
- static ParsedDelta parseDelta(FileStatus path) {
- String filename = path.getPath().getName();
+ private static ParsedDelta parseDelta(FileStatus path) {
+ ParsedDelta p = parsedDelta(path.getPath());
+ return new ParsedDelta(p.getMinTransaction(),
+ p.getMaxTransaction(), path, p.statementId);
+ }
+ public static ParsedDelta parsedDelta(Path deltaDir) {
+ String filename = deltaDir.getName();
if (filename.startsWith(DELTA_PREFIX)) {
String rest = filename.substring(DELTA_PREFIX.length());
int split = rest.indexOf('_');
+ int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
long min = Long.parseLong(rest.substring(0, split));
- long max = Long.parseLong(rest.substring(split + 1));
- return new ParsedDelta(min, max, path);
+ long max = split2 == -1 ?
+ Long.parseLong(rest.substring(split + 1)) :
+ Long.parseLong(rest.substring(split + 1, split2));
+ if(split2 == -1) {
+ return new ParsedDelta(min, max, null);
+ }
+ int statementId = Integer.parseInt(rest.substring(split2 + 1));
+ return new ParsedDelta(min, max, null, statementId);
}
- throw new IllegalArgumentException(path + " does not start with " +
+ throw new IllegalArgumentException(deltaDir + " does not start with " +
DELTA_PREFIX);
}
@@ -407,15 +494,24 @@ public class AcidUtils {
Collections.sort(working);
long current = bestBaseTxn;
+ int lastStmtId = -1;
for(ParsedDelta next: working) {
if (next.maxTransaction > current) {
// are any of the new transactions ones that we care about?
if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
- ValidTxnList.RangeResponse.NONE) {
+ ValidTxnList.RangeResponse.NONE) {
deltas.add(next);
current = next.maxTransaction;
+ lastStmtId = next.statementId;
}
- } else {
+ }
+ else if(next.maxTransaction == current && lastStmtId >= 0) {
+ //make sure to get all deltas within a single transaction; multi-statement txn
+ //generate multiple delta files with the same txnId range
+ //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
+ deltas.add(next);
+ }
+ else {
obsolete.add(next.path);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 7ad5aa0..50ba740 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -297,31 +297,32 @@ public final class HiveFileFormatUtils {
// TODO not 100% sure about this. This call doesn't set the compression type in the conf
// file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
// sure if this is correct or not.
- return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
- bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+ return getRecordUpdater(jc, acidOutputFormat,
+ bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf);
}
private static RecordUpdater getRecordUpdater(JobConf jc,
AcidOutputFormat<?, ?> acidOutputFormat,
- boolean isCompressed,
- long txnId,
int bucket,
ObjectInspector inspector,
Properties tableProp,
Path outPath,
Reporter reporter,
- int rowIdColNum) throws IOException {
+ int rowIdColNum,
+ FileSinkDesc conf) throws IOException {
return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
- .isCompressed(isCompressed)
+ .isCompressed(conf.getCompressed())
.tableProperties(tableProp)
.reporter(reporter)
.writingBase(false)
- .minimumTransactionId(txnId)
- .maximumTransactionId(txnId)
+ .minimumTransactionId(conf.getTransactionId())
+ .maximumTransactionId(conf.getTransactionId())
.bucket(bucket)
.inspector(inspector)
- .recordIdColumn(rowIdColNum));
+ .recordIdColumn(rowIdColNum)
+ .statementId(conf.getStatementId())
+ .finalDestination(conf.getDestPath()));
}
public static PartitionDesc getPartitionDescFromPathRecursively(
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8864013..3a9e64e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -439,13 +439,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final FileStatus file;
private final FileInfo fileInfo;
private final boolean isOriginal;
- private final List<Long> deltas;
+ private final List<DeltaMetaData> deltas;
private final boolean hasBase;
SplitInfo(Context context, FileSystem fs,
FileStatus file, FileInfo fileInfo,
boolean isOriginal,
- List<Long> deltas,
+ List<DeltaMetaData> deltas,
boolean hasBase, Path dir, boolean[] covered) throws IOException {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
@@ -467,12 +467,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
FileSystem fs;
List<FileStatus> files;
boolean isOriginal;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
Path dir;
boolean[] covered;
public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
- boolean isOriginal, List<Long> deltas, boolean[] covered) {
+ boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
this.context = context;
this.dir = dir;
this.fs = fs;
@@ -543,14 +543,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class BISplitStrategy extends ACIDSplitStrategy {
List<FileStatus> fileStatuses;
boolean isOriginal;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
FileSystem fs;
Context context;
Path dir;
public BISplitStrategy(Context context, FileSystem fs,
Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
- List<Long> deltas, boolean[] covered) {
+ List<DeltaMetaData> deltas, boolean[] covered) {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fileStatuses = fileStatuses;
@@ -587,11 +587,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
*/
static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
Path dir;
- List<Long> deltas;
+ List<DeltaMetaData> deltas;
boolean[] covered;
int numBuckets;
- public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) {
+ public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
this.dir = dir;
this.numBuckets = numBuckets;
this.deltas = deltas;
@@ -640,7 +640,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final SplitStrategy splitStrategy;
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
context.conf, context.transactionList);
- List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+ List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
Path base = dirInfo.getBaseDirectory();
List<FileStatus> original = dirInfo.getOriginalFiles();
boolean[] covered = new boolean[context.numBuckets];
@@ -718,7 +718,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private Metadata metadata;
private List<OrcProto.Type> types;
private final boolean isOriginal;
- private final List<Long> deltas;
+ private final List<DeltaMetaData> deltas;
private final boolean hasBase;
private OrcFile.WriterVersion writerVersion;
private long projColsUncompressedSize;
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
index da23544..b58c880 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit {
private boolean hasFooter;
private boolean isOriginal;
private boolean hasBase;
- private final List<Long> deltas = new ArrayList<Long>();
+ private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
private OrcFile.WriterVersion writerVersion;
protected OrcNewSplit(){
@@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit {
(hasFooter ? OrcSplit.FOOTER_FLAG : 0);
out.writeByte(flags);
out.writeInt(deltas.size());
- for(Long delta: deltas) {
- out.writeLong(delta);
+ for(AcidInputFormat.DeltaMetaData delta: deltas) {
+ delta.write(out);
}
if (hasFooter) {
// serialize FileMetaInfo fields
@@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit {
deltas.clear();
int numDeltas = in.readInt();
for(int i=0; i < numDeltas; i++) {
- deltas.add(in.readLong());
+ AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+ dmd.readFields(in);
+ deltas.add(dmd);
}
if (hasFooter) {
// deserialize FileMetaInfo fields
@@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit {
return hasBase;
}
- public List<Long> getDeltas() {
+ public List<AcidInputFormat.DeltaMetaData> getDeltas() {
return deltas;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 728118a..2f11611 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
/**
* A RecordIdentifier extended with the current transaction id. This is the
* key of our merge sort with the originalTransaction, bucket, and rowId
- * ascending and the currentTransaction descending. This means that if the
+ * ascending and the currentTransaction, statementId descending. This means that if the
* reader is collapsing events to just the last update, just the first
* instance of each record is required.
*/
final static class ReaderKey extends RecordIdentifier{
private long currentTransactionId;
+ private int statementId;//sort on this descending, like currentTransactionId
public ReaderKey() {
- this(-1, -1, -1, -1);
+ this(-1, -1, -1, -1, 0);
}
public ReaderKey(long originalTransaction, int bucket, long rowId,
long currentTransactionId) {
+ this(originalTransaction, bucket, rowId, currentTransactionId, 0);
+ }
+ /**
+ * @param statementId - set this to 0 if N/A
+ */
+ public ReaderKey(long originalTransaction, int bucket, long rowId,
+ long currentTransactionId, int statementId) {
super(originalTransaction, bucket, rowId);
this.currentTransactionId = currentTransactionId;
+ this.statementId = statementId;
}
@Override
public void set(RecordIdentifier other) {
super.set(other);
currentTransactionId = ((ReaderKey) other).currentTransactionId;
+ statementId = ((ReaderKey) other).statementId;
}
public void setValues(long originalTransactionId,
int bucket,
long rowId,
- long currentTransactionId) {
+ long currentTransactionId,
+ int statementId) {
setValues(originalTransactionId, bucket, rowId);
this.currentTransactionId = currentTransactionId;
+ this.statementId = statementId;
}
@Override
public boolean equals(Object other) {
return super.equals(other) &&
- currentTransactionId == ((ReaderKey) other).currentTransactionId;
+ currentTransactionId == ((ReaderKey) other).currentTransactionId
+ && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
+ ;
}
@Override
@@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (currentTransactionId != oth.currentTransactionId) {
return currentTransactionId < oth.currentTransactionId ? +1 : -1;
}
+ if(statementId != oth.statementId) {
+ return statementId < oth.statementId ? +1 : -1;
+ }
} else {
return -1;
}
@@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return sup;
}
+ /**
+ * This means 1 txn modified the same row more than once
+ */
+ private boolean isSameRow(ReaderKey other) {
+ return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
+ }
+
public long getCurrentTransactionId() {
return currentTransactionId;
}
@@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
public String toString() {
return "{originalTxn: " + getTransactionId() + ", bucket: " +
getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
- currentTransactionId + "}";
+ currentTransactionId + ", statementId: "+ statementId + "}";
}
}
@@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
final ReaderKey key;
final RecordIdentifier maxKey;
final int bucket;
+ private final int statementId;
/**
* Create a reader that reads from the first key larger than minKey to any
@@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @param maxKey only return keys less than or equal to maxKey if it is
* non-null
* @param options options to provide to read the rows.
+ * @param statementId id of SQL statement within a transaction
* @throws IOException
*/
ReaderPair(ReaderKey key, Reader reader, int bucket,
RecordIdentifier minKey, RecordIdentifier maxKey,
- ReaderImpl.Options options) throws IOException {
+ ReaderImpl.Options options, int statementId) throws IOException {
this.reader = reader;
this.key = key;
this.maxKey = maxKey;
this.bucket = bucket;
// TODO use stripe statistics to jump over stripes
recordReader = reader.rowsOptions(options);
+ this.statementId = statementId;
// advance the reader until we reach the minimum key
do {
next(nextRecord);
@@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
OrcRecordUpdater.getBucket(nextRecord),
OrcRecordUpdater.getRowId(nextRecord),
- OrcRecordUpdater.getCurrentTransaction(nextRecord));
+ OrcRecordUpdater.getCurrentTransaction(nextRecord),
+ statementId);
// if this record is larger than maxKey, we need to stop
if (maxKey != null && key.compareRow(maxKey) > 0) {
@@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
RecordIdentifier minKey, RecordIdentifier maxKey,
Reader.Options options) throws IOException {
- super(key, reader, bucket, minKey, maxKey, options);
+ super(key, reader, bucket, minKey, maxKey, options, 0);
}
@Override
@@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord.setFieldValue(OrcRecordUpdater.ROW,
recordReader.next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucket, nextRowId, 0L);
+ key.setValues(0L, bucket, nextRowId, 0L, 0);
if (maxKey != null && key.compareRow(maxKey) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + maxKey);
@@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.offset = options.getOffset();
this.length = options.getLength();
this.validTxnList = validTxnList;
- // modify the optins to reflect the event instead of the base row
+ // modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
if (reader == null) {
baseReader = null;
@@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
options);
} else {
pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
- eventOptions);
+ eventOptions, 0);
}
// if there is at least one record, put it in the map
@@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
for(Path delta: deltaDirectory) {
ReaderKey key = new ReaderKey();
Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
+ AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
FileSystem fs = deltaFile.getFileSystem(conf);
long length = getLastFlushLength(fs, deltaFile);
if (length != -1 && fs.exists(deltaFile)) {
Reader deltaReader = OrcFile.createReader(deltaFile,
OrcFile.readerOptions(conf).maxLength(length));
ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
- maxKey, eventOptions);
+ maxKey, eventOptions, deltaDir.getStatementId());
if (deltaPair.nextRecord != null) {
readers.put(key, deltaPair);
}
@@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
continue;
}
+ /*for multi-statement txns, you may have multiple events for the same
+ * row in the same (current) transaction. We want to collapse these to just the last one
+ * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the
+ * same row in the same txn. There is no benefit passing along anything except the last
+ * event. If we did want to pass it along, we'd have to include statementId in the row
+ * returned so that compaction could write it out or make minor minor compaction understand
+ * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any
+ * value in this.*/
+ boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier);
// if we are collapsing, figure out if this is a new row
- if (collapse) {
- keysSame = prevKey.compareRow(recordIdentifier) == 0;
+ if (collapse || isSameRow) {
+ keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow);
if (!keysSame) {
prevKey.set(recordIdentifier);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index b576496..e4651b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater {
private final IntWritable bucket = new IntWritable();
private final LongWritable rowId = new LongWritable();
private long insertedRows = 0;
+ private long rowIdOffset = 0;
// This records how many rows have been inserted or deleted. It is separate from insertedRows
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
@@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater {
item.setFieldValue(ROW_ID, rowId);
}
+ /**
+ * To handle multiple INSERT... statements in a single transaction, we want to make sure
+ * to generate unique {@code rowId} for all inserted rows of the transaction.
+ * @return largest rowId created by previous statements (maybe 0)
+ * @throws IOException
+ */
+ private long findRowIdOffsetForInsert() throws IOException {
+ /*
+ * 1. need to know bucket we are writing to
+ * 2. need to know which delta dir it's in
+ * Then,
+ * 1. find the same bucket file in previous delta dir for this txn
+ * 2. read the footer and get AcidStats which has insert count
+ * 2.1 if AcidStats.inserts>0 done
+ * else go to previous delta file
+ * For example, consider insert/update/insert case...*/
+ if(options.getStatementId() <= 0) {
+ return 0;//there is only 1 statement in this transaction (so far)
+ }
+ for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
+ Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
+ if(!fs.exists(matchingBucket)) {
+ continue;
+ }
+ Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
+ //no close() on Reader?!
+ AcidStats acidStats = parseAcidStats(reader);
+ if(acidStats.inserts > 0) {
+ return acidStats.inserts;
+ }
+ }
+ //if we got here, we looked at all delta files in this txn, prior to current statement and didn't
+ //find any inserts...
+ return 0;
+ }
// Find the record identifier column (if there) and return a possibly new ObjectInspector that
// will strain out the record id for the underlying writer.
private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
@@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
}
+ else if(operation == INSERT_OPERATION) {
+ rowId += rowIdOffset;
+ }
this.rowId.set(rowId);
this.originalTransaction.set(originalTransaction);
item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
@@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater {
public void insert(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
+ //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
+ //always true in that case
+ rowIdOffset = findRowIdOffsetForInsert();
}
addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
rowCountDelta++;
@@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater {
}
return result;
}
+ /**
+ * {@link KeyIndexBuilder} creates these
+ */
+ static AcidStats parseAcidStats(Reader reader) {
+ String statsSerialized;
+ try {
+ ByteBuffer val =
+ reader.getMetadataValue(OrcRecordUpdater.ACID_STATS)
+ .duplicate();
+ statsSerialized = utf8Decoder.decode(val).toString();
+ } catch (CharacterCodingException e) {
+ throw new IllegalArgumentException("Bad string encoding for " +
+ OrcRecordUpdater.ACID_STATS, e);
+ }
+ return new AcidStats(statsSerialized);
+ }
static class KeyIndexBuilder implements OrcFile.WriterCallback {
StringBuilder lastKey = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 0c7dd40..8cf4cc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.FileSplit;
@@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit {
private boolean hasFooter;
private boolean isOriginal;
private boolean hasBase;
- private final List<Long> deltas = new ArrayList<Long>();
+ private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
private OrcFile.WriterVersion writerVersion;
private long projColsUncompressedSize;
@@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit {
public OrcSplit(Path path, long offset, long length, String[] hosts,
ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
- List<Long> deltas, long projectedDataSize) {
+ List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
super(path, offset, length, hosts);
this.fileMetaInfo = fileMetaInfo;
hasFooter = this.fileMetaInfo != null;
@@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit {
(hasFooter ? FOOTER_FLAG : 0);
out.writeByte(flags);
out.writeInt(deltas.size());
- for(Long delta: deltas) {
- out.writeLong(delta);
+ for(AcidInputFormat.DeltaMetaData delta: deltas) {
+ delta.write(out);
}
if (hasFooter) {
// serialize FileMetaInfo fields
@@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit {
deltas.clear();
int numDeltas = in.readInt();
for(int i=0; i < numDeltas; i++) {
- deltas.add(in.readLong());
+ AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+ dmd.readFields(in);
+ deltas.add(dmd);
}
if (hasFooter) {
// deserialize FileMetaInfo fields
@@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit {
return hasBase;
}
- public List<Long> getDeltas() {
+ public List<AcidInputFormat.DeltaMetaData> getDeltas() {
return deltas;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index f8fff1a..445f606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl {
private DbLockManager lockMgr = null;
private IMetaStoreClient client = null;
private long txnId = 0;
+ /**
+ * assigns a unique monotonically increasing ID to each statement
+ * which is part of an open transaction. This is used by storage
+ * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
+ * to keep apart multiple writes of the same data within the same transaction
+ * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
+ */
+ private int statementId = -1;
DbTxnManager() {
}
@@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
init();
try {
txnId = client.openTxn(user);
+ statementId = 0;
LOG.debug("Opened txn " + txnId);
return txnId;
} catch (TException e) {
@@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
return null;
}
- List<HiveLock> locks = new ArrayList<HiveLock>(1);
+ List<HiveLock> locks = new ArrayList<HiveLock>(1);
+ if(txnId > 0) {
+ statementId++;
+ }
LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks);
ctx.setHiveLocks(locks);
return lockState;
@@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
+ statementId = -1;
}
}
@@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
+ statementId = -1;
}
}
@@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
}
}
+ @Override
+ public int getStatementId() {
+ return statementId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 21ab8ee..1906982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
+ public int getStatementId() {
+ return 0;
+ }
+ @Override
public HiveLockManager getLockManager() throws LockException {
if (lockMgr == null) {
boolean supportConcurrency =
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 2dd0c7d..6c3dc33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -127,4 +127,7 @@ public interface HiveTxnManager {
* @return true if this transaction manager does ACID
*/
boolean supportsAcid();
+
+ int getStatementId();
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b02374e..8516631 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
rsCtx.getNumFiles(),
rsCtx.getTotalFiles(),
rsCtx.getPartnCols(),
- dpCtx);
+ dpCtx,
+ dest_path);
// If this is an insert, update, or delete on an ACID table then mark that so the
// FileSinkOperator knows how to properly write to it.
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index bb6cee5..f73b502 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc {
// Record what type of write this is. Default is non-ACID (ie old style).
private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
private long txnId = 0; // transaction id for this operation
+ private int statementId = -1;
private transient Table table;
+ private Path destPath;
public FileSinkDesc() {
}
+ /**
+ * @param destPath - the final destination for data
+ */
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
final boolean compressed, final int destTableId, final boolean multiFileSpray,
final boolean canBeMerged, final int numFiles, final int totalFiles,
- final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) {
+ final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {
this.dirName = dirName;
this.tableInfo = tableInfo;
@@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
this.partitionCols = partitionCols;
this.dpCtx = dpCtx;
this.dpSortState = DPSortState.NONE;
+ this.destPath = destPath;
}
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public Object clone() throws CloneNotSupportedException {
FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx);
+ partitionCols, dpCtx, destPath);
ret.setCompressCodec(compressCodec);
ret.setCompressType(compressType);
ret.setGatherStats(gatherStats);
@@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc {
return temporary;
}
- /**
- * @param totalFiles the totalFiles to set
- */
public void setTemporary(boolean temporary) {
this.temporary = temporary;
}
@@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public void setTransactionId(long id) {
txnId = id;
}
-
public long getTransactionId() {
return txnId;
}
+ public void setStatementId(int id) {
+ statementId = id;
+ }
+ /**
+ * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)}
+ */
+ public int getStatementId() {
+ return statementId;
+ }
+ public Path getDestPath() {
+ return destPath;
+ }
+
public Table getTable() {
return table;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c5f2d4d..6c77ba4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -545,7 +545,9 @@ public class CompactorMR {
.reporter(reporter)
.minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
.maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
- .bucket(bucket);
+ .bucket(bucket)
+ .statementId(-1);//setting statementId == -1 makes compacted delta files use
+ //delta_xxxx_yyyy format
// Instantiate the underlying output format
@SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index e400778..c6ae030 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -303,7 +303,8 @@ public class TestFileSinkOperator {
Map<String, String> partColNames = new HashMap<String, String>(1);
partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
dpCtx.setInputToDPCols(partColNames);
- desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
+ //todo: does this need the finalDestination?
+ desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 1e3df34..f8ded12 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -46,17 +46,23 @@ public class TestAcidUtils {
AcidUtils.createFilename(p, options).toString());
options.bucket(123);
assertEquals("/tmp/00123_0",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
options.bucket(23)
.minimumTransactionId(100)
.maximumTransactionId(200)
.writingBase(true)
.setOldStyle(false);
assertEquals("/tmp/base_0000200/bucket_00023",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
options.writingBase(false);
+ assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.statementId(-1);
assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
- AcidUtils.createFilename(p, options).toString());
+ AcidUtils.createFilename(p, options).toString());
+ options.statementId(7);
+ assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
}
@Test
@@ -236,7 +242,6 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
- new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
Path part = new MockPath(fs, "mock:/tbl/part1");
AcidUtils.Directory dir =
@@ -254,6 +259,45 @@ public class TestAcidUtils {
assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString());
}
+ /**
+ * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns
+ * @throws Exception
+ */
+ @Test
+ public void testOverlapingDelta2() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+ assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+ List<FileStatus> obsolete = dir.getObsolete();
+ assertEquals(5, obsolete.size());
+ assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(5, delts.size());
+ assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString());
+ }
+
@Test
public void deltasWithOpenTxnInRead() throws Exception {
Configuration conf = new Configuration();
@@ -268,6 +312,27 @@ public class TestAcidUtils {
assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
}
+ /**
+ * @since 1.3.0
+ * @throws Exception
+ */
+ @Test
+ public void deltasWithOpenTxnInRead2() throws Exception {
+ Configuration conf = new Configuration();
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(2, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString());
+ }
+
@Test
public void deltasWithOpenTxnsNotInCompact() throws Exception {
Configuration conf = new Configuration();
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 56e5f9f..e96ab2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -927,7 +928,7 @@ public class TestInputOutputFormat {
OrcInputFormat.SplitGenerator splitter =
new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
fs.getFileStatus(new Path("/a/file")), null, true,
- new ArrayList<Long>(), true, null, null));
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
OrcSplit result = splitter.createSplit(0, 200, null);
assertEquals(0, result.getStart());
assertEquals(200, result.getLength());
@@ -968,7 +969,7 @@ public class TestInputOutputFormat {
OrcInputFormat.SplitGenerator splitter =
new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
fs.getFileStatus(new Path("/a/file")), null, true,
- new ArrayList<Long>(), true, null, null));
+ new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
List<OrcSplit> results = splitter.call();
OrcSplit result = results.get(0);
assertEquals(3, result.getStart());
@@ -990,7 +991,7 @@ public class TestInputOutputFormat {
conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
context = new OrcInputFormat.Context(conf);
splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
- fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(),
+ fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(),
true, null, null));
results = splitter.call();
for(int i=0; i < stripeSizes.length; ++i) {
@@ -1497,7 +1498,7 @@ public class TestInputOutputFormat {
Path partDir = new Path(conf.get("mapred.input.dir"));
OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(0).inspector(inspector));
+ .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir));
for(int i=0; i < 100; ++i) {
BigRow row = new BigRow(i);
writer.insert(10, row);
@@ -1648,7 +1649,7 @@ public class TestInputOutputFormat {
// write a base file in partition 0
OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(0).inspector(inspector));
+ .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0]));
for(int i=0; i < 10; ++i) {
writer.insert(10, new MyRow(i, 2 * i));
}
@@ -1661,7 +1662,7 @@ public class TestInputOutputFormat {
// write a delta file in partition 0
writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
- .writingBase(true).bucket(1).inspector(inspector));
+ .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0]));
for(int i=10; i < 20; ++i) {
writer.insert(10, new MyRow(i, 2*i));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 921e954..39f71f1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull;
public class TestOrcRawRecordMerger {
private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class);
-
+//todo: why is statementId -1?
@Test
public void testOrdering() throws Exception {
ReaderKey left = new ReaderKey(100, 200, 1200, 300);
ReaderKey right = new ReaderKey();
- right.setValues(100, 200, 1000, 200);
+ right.setValues(100, 200, 1000, 200,1);
assertTrue(right.compareTo(left) < 0);
assertTrue(left.compareTo(right) > 0);
assertEquals(false, left.equals(right));
@@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger {
assertEquals(true, right.equals(left));
right.setRowId(2000);
assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 4);
- right.setValues(100, 2, 3, 4);
+ left.setValues(1, 2, 3, 4,-1);
+ right.setValues(100, 2, 3, 4,-1);
assertTrue(left.compareTo(right) < 0);
assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 4);
- right.setValues(1, 100, 3, 4);
+ left.setValues(1, 2, 3, 4,-1);
+ right.setValues(1, 100, 3, 4,-1);
assertTrue(left.compareTo(right) < 0);
assertTrue(right.compareTo(left) > 0);
- left.setValues(1, 2, 3, 100);
- right.setValues(1, 2, 3, 4);
+ left.setValues(1, 2, 3, 100,-1);
+ right.setValues(1, 2, 3, 4,-1);
assertTrue(left.compareTo(right) < 0);
assertTrue(right.compareTo(left) > 0);
@@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger {
RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
- new Reader.Options());
+ new Reader.Options(), 0);
RecordReader recordReader = pair.recordReader;
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketId());
@@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger {
Reader reader = createMockReader();
ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
- new Reader.Options());
+ new Reader.Options(), 0);
RecordReader recordReader = pair.recordReader;
assertEquals(10, key.getTransactionId());
assertEquals(20, key.getBucketId());
@@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger {
// write the empty base
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.inspector(inspector).bucket(BUCKET).writingBase(true)
- .maximumTransactionId(100);
+ .maximumTransactionId(100).finalDestination(root);
of.getRecordUpdater(root, options).close(false);
ValidTxnList txnList = new ValidReadTxnList("200:");
@@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger {
*/
@Test
public void testNewBaseAndDelta() throws Exception {
+ testNewBaseAndDelta(false);
+ testNewBaseAndDelta(true);
+ }
+ private void testNewBaseAndDelta(boolean use130Format) throws Exception {
final int BUCKET = 10;
String[] values = new String[]{"first", "second", "third", "fourth",
"fifth", "sixth", "seventh", "eighth",
@@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger {
// write the base
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
- .inspector(inspector).bucket(BUCKET);
+ .inspector(inspector).bucket(BUCKET).finalDestination(root);
+ if(!use130Format) {
+ options.statementId(-1);
+ }
RecordUpdater ru = of.getRecordUpdater(root,
options.writingBase(true).maximumTransactionId(100));
for(String v: values) {
@@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger {
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
- assertEquals(new Path(root, "delta_0000200_0000200"),
+ assertEquals(new Path(root, use130Format ?
+ AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
directory.getCurrentDirectories().get(0).getPath());
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger {
// write a delta
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
- .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
@@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger {
options.orcOptions(OrcFile.writerOptions(conf)
.stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
.memory(mgr));
+ options.finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
"2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
@@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger {
AcidOutputFormat.Options options =
new AcidOutputFormat.Options(conf)
.bucket(BUCKET).inspector(inspector).filesystem(fs)
- .writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+ .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+ .finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
String[] values = new String[]{"a", "b", "c", "d", "e"};
for(int i=0; i < values.length; ++i) {
@@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger {
*/
@Test
public void testRecordReaderIncompleteDelta() throws Exception {
+ testRecordReaderIncompleteDelta(false);
+ testRecordReaderIncompleteDelta(true);
+ }
+ /**
+ *
+ * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
+ */
+ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
final int BUCKET = 1;
Configuration conf = new Configuration();
OrcOutputFormat of = new OrcOutputFormat();
@@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger {
AcidOutputFormat.Options options =
new AcidOutputFormat.Options(conf)
.writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
- .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
+ if(!use130Format) {
+ options.statementId(-1);
+ }
RecordUpdater ru = of.getRecordUpdater(root, options);
String[] values= new String[]{"1", "2", "3", "4", "5"};
for(int i=0; i < values.length; ++i) {
@@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger {
splits = inf.getSplits(job, 1);
assertEquals(2, splits.length);
rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
- Path sideFile = new Path(root +
- "/delta_0000010_0000019/bucket_00001_flush_length");
+ Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
+ AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
assertEquals(true, fs.exists(sideFile));
assertEquals(24, fs.getFileStatus(sideFile).getLen());
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index 22bd4b9..22030b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -97,7 +97,8 @@ public class TestOrcRecordUpdater {
.minimumTransactionId(10)
.maximumTransactionId(19)
.inspector(inspector)
- .reporter(Reporter.NULL);
+ .reporter(Reporter.NULL)
+ .finalDestination(root);
RecordUpdater updater = new OrcRecordUpdater(root, options);
updater.insert(11, new MyRow("first"));
updater.insert(11, new MyRow("second"));
@@ -197,7 +198,8 @@ public class TestOrcRecordUpdater {
.maximumTransactionId(100)
.inspector(inspector)
.reporter(Reporter.NULL)
- .recordIdColumn(1);
+ .recordIdColumn(1)
+ .finalDestination(root);
RecordUpdater updater = new OrcRecordUpdater(root, options);
updater.update(100, new MyRow("update", 30, 10, bucket));
updater.delete(100, new MyRow("", 60, 40, bucket));
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 671e122..21adc9d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -241,7 +241,7 @@ public abstract class CompactorTest {
return sd;
}
- // I can't do this with @Before because I want to be able to control when the thead starts
+ // I can't do this with @Before because I want to be able to control when the thread starts
private void startThread(char type, boolean stopAfterOne) throws Exception {
startThread(type, stopAfterOne, new AtomicBoolean());
}
@@ -284,7 +284,7 @@ public abstract class CompactorTest {
switch (type) {
case BASE: filename = "base_" + maxTxn; break;
case LENGTH_FILE: // Fall through to delta
- case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
+ case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
case LEGACY: break; // handled below
}
@@ -508,5 +508,21 @@ public abstract class CompactorTest {
}
}
+ /**
+ * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
+ * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats
+ * are used since new (1.3) code has to be able to read old files.
+ */
+ abstract boolean useHive130DeltaDirName();
+ String makeDeltaDirName(long minTxnId, long maxTxnId) {
+ return useHive130DeltaDirName() ?
+ AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+ }
+ /**
+ * delta dir name after compaction
+ */
+ String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+ return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index ffdbb9a..0db732c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest {
boolean sawBase = false, sawDelta = false;
for (Path p : paths) {
if (p.getName().equals("base_20")) sawBase = true;
- else if (p.getName().equals("delta_21_24")) sawDelta = true;
+ else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
else Assert.fail("Unexpected file " + p.getName());
}
Assert.assertTrue(sawBase);
@@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest {
boolean sawBase = false, sawDelta = false;
for (Path path : paths) {
if (path.getName().equals("base_20")) sawBase = true;
- else if (path.getName().equals("delta_21_24")) sawDelta = true;
+ else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true;
else Assert.fail("Unexpected file " + path.getName());
}
Assert.assertTrue(sawBase);
@@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(0, rsp.getCompactsSize());
}
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
new file mode 100644
index 0000000..c637dd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
@@ -0,0 +1,14 @@
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+/**
+ * Same as TestCleaner but tests delta file names in Hive 1.3.0 format
+ */
+public class TestCleaner2 extends TestCleaner {
+ public TestCleaner2() throws Exception {
+ super();
+ }
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/66feedc5/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 00b13de..0b0b1da 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest {
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(0, compacts.size());
}
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
}