You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/02/23 16:31:15 UTC
[17/21] hive git commit: HIVE-18192: Introduce WriteID per table
rather than using global transaction ID (Sankar Hariappan,
reviewed by Eugene Koifman)
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 236e585..5c13781 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StringableMap;
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -110,7 +110,7 @@ public class CompactorMR {
}
private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, CompactionInfo ci) {
+ ValidWriteIdList writeIds, CompactionInfo ci) {
JobConf job = new JobConf(conf);
job.setJobName(jobName);
job.setOutputKeyClass(NullWritable.class);
@@ -135,7 +135,7 @@ public class CompactorMR {
job.setBoolean(IS_COMPRESSED, sd.isCompressed());
job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
job.setInt(NUM_BUCKETS, sd.getNumBuckets());
- job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+ job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable
if (ci.properties != null) {
overrideTblProps(job, t.getParameters(), ci.properties);
@@ -197,12 +197,12 @@ public class CompactorMR {
* @param jobName name to run this job with
* @param t metastore table
* @param sd metastore storage descriptor
- * @param txns list of valid transactions
+ * @param writeIds list of valid write ids
* @param ci CompactionInfo
* @throws java.io.IOException if the job fails
*/
- void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
- ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
+ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds,
+ CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
@@ -213,18 +213,18 @@ public class CompactorMR {
if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
LOG.debug("Going to delete directories for aborted transactions for MM table "
+ t.getDbName() + "." + t.getTableName());
- removeFiles(conf, sd.getLocation(), txns, t);
+ removeFiles(conf, sd.getLocation(), writeIds, t);
return;
}
- JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
+ JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci);
// Figure out and encode what files we need to read. We do this here (rather than in
// getSplits below) because as part of this we discover our minimum and maximum transactions,
// and discovering that in getSplits is too late as we then have no way to pass it to our
// mapper.
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns, false, true);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, false, true);
List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
if(parsedDeltas.size() > maxDeltastoHandle) {
@@ -242,14 +242,14 @@ public class CompactorMR {
"runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
- JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci);
+ JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci);
launchCompactionJob(jobMinorCompact,
null, CompactionType.MINOR, null,
parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
maxDeltastoHandle, -1, conf, txnHandler, ci.id, jobName);
}
//now recompute state since we've done minor compactions and have different 'best' set of deltas
- dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+ dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
}
StringableList dirsToSearch = new StringableList();
@@ -280,8 +280,8 @@ public class CompactorMR {
if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) {
// Skip compaction if there's no delta files AND there's no original files
String minOpenInfo = ".";
- if(txns.getMinOpenTxn() != null) {
- minOpenInfo = " with min Open " + JavaUtils.txnIdToString(txns.getMinOpenTxn()) +
+ if(writeIds.getMinOpenWriteId() != null) {
+ minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) +
". Compaction cannot compact above this txnid";
}
LOG.error("No delta files or original files found to compact in " + sd.getLocation() +
@@ -316,8 +316,8 @@ public class CompactorMR {
LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
dirsToSearch.add(delta.getPath());
deltaDirs.add(delta.getPath());
- minTxn = Math.min(minTxn, delta.getMinTransaction());
- maxTxn = Math.max(maxTxn, delta.getMaxTransaction());
+ minTxn = Math.min(minTxn, delta.getMinWriteId());
+ maxTxn = Math.max(maxTxn, delta.getMaxWriteId());
}
if (baseDir != null) job.set(BASE_DIR, baseDir.toString());
@@ -379,9 +379,9 @@ public class CompactorMR {
}
// Remove the directories for aborted transactions only
- private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t)
+ private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t)
throws IOException {
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList,
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList,
Ref.from(false), false, t.getParameters());
// For MM table, we only want to delete delta dirs for aborted txns.
List<FileStatus> abortedDirs = dir.getAbortedDirectories();
@@ -718,13 +718,13 @@ public class CompactorMR {
@SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
AcidInputFormat<WritableComparable, V> aif =
instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME));
- ValidTxnList txnList =
- new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ ValidWriteIdList writeIdList =
+ new ValidCompactorWriteIdList(jobConf.get(ValidWriteIdList.VALID_WRITEIDS_KEY));
boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
aif.getRawReader(jobConf, isMajor, split.getBucket(),
- txnList, split.getBaseDir(), split.getDeltaDirs());
+ writeIdList, split.getBaseDir(), split.getDeltaDirs());
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
getWriter(reporter, reader.getObjectInspector(), split.getBucket());
@@ -779,8 +779,8 @@ public class CompactorMR {
.isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
.tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
.reporter(reporter)
- .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
- .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
.bucket(bucket)
.statementId(-1);//setting statementId == -1 makes compacted delta files use
//delta_xxxx_yyyy format
@@ -804,8 +804,8 @@ public class CompactorMR {
.isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
.tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
.reporter(reporter)
- .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
- .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
.bucket(bucket)
.statementId(-1);//setting statementId == -1 makes compacted delta files use
//delta_xxxx_yyyy format
@@ -926,8 +926,8 @@ public class CompactorMR {
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.writingBase(conf.getBoolean(IS_MAJOR, false))
.isCompressed(conf.getBoolean(IS_COMPRESSED, false))
- .minimumTransactionId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
- .maximumTransactionId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
.bucket(0)
.statementId(-1);
Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a52e023..7eda7fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -21,11 +21,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -88,8 +90,6 @@ public class Initiator extends CompactorThread {
startedAt = System.currentTimeMillis();
//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- ValidTxnList txns =
- TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -143,12 +143,21 @@ public class Initiator extends CompactorThread {
", assuming it has been dropped and moving on.");
continue;
}
+
+ // Compaction doesn't work under a transaction and hence pass null for validTxnList
+ // The response will have one entry per table and hence we get only one ValidWriteIdList
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+ GetValidWriteIdsRequest rqst
+ = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null);
+ ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
+ txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
+
StorageDescriptor sd = resolveStorageDescriptor(t, p);
String runAs = findUserToRunAs(sd.getLocation(), t);
/*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
* Long term we should consider having a thread pool here and running checkForCompactionS
* in parallel*/
- CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs);
+ CompactionType compactionNeeded = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs);
if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
} catch (Throwable t) {
LOG.error("Caught exception while trying to determine if we should compact " +
@@ -215,7 +224,7 @@ public class Initiator extends CompactorThread {
}
private CompactionType checkForCompaction(final CompactionInfo ci,
- final ValidTxnList txns,
+ final ValidWriteIdList writeIds,
final StorageDescriptor sd,
final Map<String, String> tblproperties,
final String runAs)
@@ -227,7 +236,7 @@ public class Initiator extends CompactorThread {
return CompactionType.MAJOR;
}
if (runJobAsSelf(runAs)) {
- return determineCompactionType(ci, txns, sd, tblproperties);
+ return determineCompactionType(ci, writeIds, sd, tblproperties);
} else {
LOG.info("Going to initiate as user " + runAs);
UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
@@ -235,7 +244,7 @@ public class Initiator extends CompactorThread {
CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
@Override
public CompactionType run() throws Exception {
- return determineCompactionType(ci, txns, sd, tblproperties);
+ return determineCompactionType(ci, writeIds, sd, tblproperties);
}
});
try {
@@ -248,7 +257,7 @@ public class Initiator extends CompactorThread {
}
}
- private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
+ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds,
StorageDescriptor sd, Map<String, String> tblproperties)
throws IOException, InterruptedException {
@@ -259,7 +268,7 @@ public class Initiator extends CompactorThread {
boolean noBase = false;
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false, false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, writeIds, false, false);
Path base = dir.getBaseDirectory();
long baseSize = 0;
FileStatus stat = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e5ebf9a..c47e78e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,16 +18,17 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.Driver;
@@ -138,10 +139,15 @@ public class Worker extends CompactorThread {
}
final boolean isMajor = ci.isMajorCompaction();
- final ValidTxnList txns =
- TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
- LOG.debug("ValidCompactTxnList: " + txns.writeToString());
- txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
+
+ // Compaction doesn't work under a transaction and hence pass 0 for current txn Id
+ // The response will have one entry per table and hence we get only one OpenWriteIds
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+ GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName), null);
+ final ValidWriteIdList tblValidWriteIds =
+ TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
+ LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+ txnHandler.setCompactionHighestWriteId(ci, tblValidWriteIds.getHighWatermark());
final StringBuilder jobName = new StringBuilder(name);
jobName.append("-compactor-");
jobName.append(ci.getFullPartitionName());
@@ -164,14 +170,14 @@ public class Worker extends CompactorThread {
launchedJob = true;
try {
if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
+ mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler);
+ mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 2663061..45890ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -56,7 +58,7 @@ import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
/**
- * Tests for TxnHandler.
+ * Tests for CompactionTxnHandler.
*/
public class TestCompactionTxnHandler {
@@ -478,6 +480,13 @@ public class TestCompactionTxnHandler {
String tableName = "adp_table";
OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
long txnId = openTxns.getTxn_ids().get(0);
+
+ AllocateTableWriteIdsResponse writeIds
+ = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(openTxns.getTxn_ids(), dbName, tableName));
+ long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
+ assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
+ assertEquals(1, writeId);
+
// lock a table, as in dynamic partitions
LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
lc.setIsDynamicPartitionWrite(true);
@@ -489,7 +498,7 @@ public class TestCompactionTxnHandler {
LockResponse lock = txnHandler.lock(lr);
assertEquals(LockState.ACQUIRED, lock.getState());
- AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeId, dbName, tableName,
Arrays.asList("ds=yesterday", "ds=today"));
adp.setOperationType(dop);
txnHandler.addDynamicPartitions(adp);
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 9eaf039..7b510dd 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -71,6 +73,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -149,7 +152,14 @@ public class TestTxnHandler {
txnHandler.abortTxn(new AbortTxnRequest(1));
List<String> parts = new ArrayList<String>();
parts.add("p=1");
- AddDynamicPartitions adp = new AddDynamicPartitions(3, "default", "T", parts);
+
+ AllocateTableWriteIdsResponse writeIds
+ = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(3L), "default", "T"));
+ long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
+ assertEquals(3, writeIds.getTxnToWriteIds().get(0).getTxnId());
+ assertEquals(1, writeId);
+
+ AddDynamicPartitions adp = new AddDynamicPartitions(3, writeId, "default", "T", parts);
adp.setOperationType(DataOperationType.INSERT);
txnHandler.addDynamicPartitions(adp);
GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 2a1545f..470856b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -760,14 +760,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals(536936448,
BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0),
+ rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1),
+ rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2),
+ rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000016_0000016_0000/bucket_00001"));
+ Assert.assertTrue(rs.get(3),
+ rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000001_0000001_0000/bucket_00001"));
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
@@ -777,14 +781,18 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
LOG.warn(s);
}
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
- Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000016/bucket_00000"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
- Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
- Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
- Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":16,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000016/bucket_00001"));
+ Assert.assertTrue(rs.get(0),
+ rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000"));
+ Assert.assertTrue(rs.get(1),
+ rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
+ Assert.assertTrue(rs.get(2),
+ rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
+ Assert.assertTrue(rs.get(3),
+ rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001"));
//make sure they are the same before and after compaction
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index bab6d5e..2eead9e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -366,15 +366,15 @@ public class TestTxnCommands2 {
* Note: order of rows in a file ends up being the reverse of order in values clause (why?!)
*/
String[][] expected = {
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"},
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
- {"{\"transactionid\":22,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"},
- {"{\"transactionid\":20,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"},
+ {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
for(int i = 0; i < expected.length; i++) {
@@ -759,11 +759,11 @@ public class TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -772,7 +772,7 @@ public class TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_0000024_0000024_0000", status[i].getPath().getName());
+ Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -819,7 +819,7 @@ public class TestTxnCommands2 {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000025", status[i].getPath().getName());
+ Assert.assertEquals("base_0000002", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -845,7 +845,7 @@ public class TestTxnCommands2 {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000025", status[0].getPath().getName());
+ Assert.assertEquals("base_0000002", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(1, buckets.length);
@@ -861,7 +861,7 @@ public class TestTxnCommands2 {
public void testValidTxnsBookkeeping() throws Exception {
// 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
runStatementOnDriver("select * from " + Table.NONACIDORCTBL);
- String value = hiveConf.get(ValidTxnList.VALID_TXNS_KEY);
+ String value = hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
Assert.assertNull("The entry should be null for query that doesn't involve ACID tables", value);
}
@@ -874,9 +874,9 @@ public class TestTxnCommands2 {
//this will cause next txn to be marked aborted but the data is still written to disk
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2));
- assert hiveConf.get(ValidTxnList.VALID_TXNS_KEY) == null : "previous txn should've cleaned it";
+ assert hiveConf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY) == null : "previous txn should've cleaned it";
//so now if HIVEFETCHTASKCONVERSION were to use a stale value, it would use a
- //ValidTxnList with HWM=MAX_LONG, i.e. include the data for aborted txn
+ //ValidWriteIdList with HWM=MAX_LONG, i.e. include the data for aborted txn
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
Assert.assertEquals("Extra data", 2, rs.size());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 3a3272f..0a305a4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -105,13 +105,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][]{
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}};
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "load data inpath");
runStatementOnDriver("update T set b = 17 where a = 1");
String[][] expected2 = new String[][]{
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "update");
@@ -121,15 +121,15 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
- {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"},
- {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"}
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004/bucket_00000"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "delete compact minor");
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T");
String[][] expected4 = new String[][]{
- {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"},
- {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}};
+ {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"},
+ {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite");
//load same data again (additive)
@@ -138,9 +138,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("delete from T where a = 3");//matches 2 rows
runStatementOnDriver("insert into T values(2,2)");
String[][] expected5 = new String[][]{
- {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
- {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
- {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"}
+ {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
+ {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"},
+ {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"}
};
checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
@@ -148,9 +148,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected6 = new String[][]{
- {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"},
- {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"},
- {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"}
+ {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009/bucket_00000"},
+ {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009/bucket_00000"},
+ {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009/bucket_00000"}
};
checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
}
@@ -173,22 +173,22 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
- //normal insert
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"},
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"},
- //Load Data
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}};
+ //normal insert
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+ //Load Data
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "load data inpath");
//test minor compaction
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected1 = new String[][] {
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"},
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"}
};
checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
@@ -197,11 +197,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"},
- {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"},
- {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"},
- {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
@@ -210,8 +210,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
String[][] expected3 = new String[][] {
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"},
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}};
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"},
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite");
//one more major compaction
@@ -219,9 +219,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected4 = new String[][] {
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"},
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"},
- {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}};
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005/bucket_00000"},
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005/bucket_00000"},
+ {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005/bucket_00000"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
}
/**
@@ -254,24 +254,23 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
/*
{"transactionid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0
{"transactionid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0
-{"transactionid":24,"bucketid":536870912,"rowid":0} 4 4/delta_0000024_0000024_0000/000000_0
-{"transactionid":24,"bucketid":536870912,"rowid":1} 5 5/delta_0000024_0000024_0000/000000_0
+{"transactionid":1,"bucketid":536870912,"rowid":0} 4 4/delta_0000001_0000001_0000/000000_0
+{"transactionid":1,"bucketid":536870912,"rowid":1} 5 5/delta_0000001_0000001_0000/000000_0
*/
String[][] expected = new String[][] {
- //from pre-acid insert
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
- //from Load Data into acid converted table
- {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"},
- {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"},
- {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000024_0000024_0000/000001_0"},
- {"{\"transactionid\":24,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000024_0000024_0000/000001_0"},
- {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000024_0000024_0000/000002_0"},
- {"{\"transactionid\":24,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000024_0000024_0000/000002_0"},
+ //from pre-acid insert
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
+ //from Load Data into acid converted table
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"},
+ {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"},
+ {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"},
+ {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"},
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
-
//create more staging data with copy_N files and do LD+Overwrite
runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
runStatementOnDriver("insert into Tstage values(8,8)");
@@ -279,9 +278,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
String[][] expected2 = new String[][] {
- {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"},
- {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"},
- {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000030/000001_0"}
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
@@ -291,11 +290,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
- {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"},
- {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"},
- {"{\"transactionid\":30,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000033/bucket_00001"},
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"}
-
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"},
+ {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
}
@@ -326,12 +324,12 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
String[][] expected = new String[][] {
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"},
- {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"},
- {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"},
- {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}};
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"},
+ {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"},
+ {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}};
checkExpected(rs, expected, "load data inpath partitioned");
@@ -340,10 +338,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("truncate table Tstage");
runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)");
String[][] expected2 = new String[][] {
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"},
- {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}};
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"},
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"},
+ {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}};
rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
checkExpected(rs, expected2, "load data inpath partitioned overwrite");
}
@@ -405,20 +403,20 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"},
- {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"},
+ {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"}
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
//at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
@@ -444,8 +442,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
"select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
String[][] expected = new String[][] {
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
- {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"},
+ {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}
};
checkResult(expected, testQuery, isVectorized, "load data inpath");
}