You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/04/27 00:35:08 UTC
[2/2] hive git commit: HIVE-19124 : implement a basic major compactor
for MM tables (Sergey Shelukhin,
reviewed by Eugene Koifman and Gopal Vijayaraghavan)
HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea18769f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea18769f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea18769f
Branch: refs/heads/branch-3
Commit: ea18769f026429ea6ebbbd66858920ebf869a9d6
Parents: cc0ef46
Author: sergey <se...@apache.org>
Authored: Thu Apr 26 17:19:33 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Apr 26 17:19:52 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/ql/txn/compactor/TestCompactor.java | 261 ++++++++++++--
.../java/org/apache/hadoop/hive/ql/Driver.java | 28 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 3 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 19 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 2 -
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 +-
.../apache/hadoop/hive/ql/plan/PlanUtils.java | 3 +
.../hive/ql/txn/compactor/CompactorMR.java | 337 ++++++++++++++++++-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 25 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 6 +-
.../hive/ql/TestTxnCommandsForMmTable.java | 73 +---
.../metastore/api/hive_metastoreConstants.java | 7 +-
.../hive/metastore/HiveMetaStoreClient.java | 6 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 4 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 20 +-
.../HiveMetaStoreClientPreCatalog.java | 4 +-
.../hive/common/ValidReaderWriteIdList.java | 4 +
18 files changed, 654 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b588ebb..3e40f63 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2531,6 +2531,9 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
"Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
+
+ HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
+ "Whether the compactor should compact insert-only tables. A safety switch."),
/**
* @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index b19aa23..5b5d818 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,11 +31,13 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,9 +48,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -704,16 +709,7 @@ public class TestCompactor {
// it has an open txn in it
writeBatch(connection, writer, true);
- // Now, compact
- TxnStore txnHandler = TxnUtils.getTxnStore(conf);
- txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
- Worker t = new Worker();
- t.setThreadId((int) t.getId());
- t.setConf(conf);
- AtomicBoolean stop = new AtomicBoolean(true);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
+ runMajorCompaction(dbName, tblName);
// Find the location of the table
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -827,16 +823,7 @@ public class TestCompactor {
txnBatch.abort();
- // Now, compact
- TxnStore txnHandler = TxnUtils.getTxnStore(conf);
- txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
- Worker t = new Worker();
- t.setThreadId((int) t.getId());
- t.setConf(conf);
- AtomicBoolean stop = new AtomicBoolean(true);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
+ runMajorCompaction(dbName, tblName);
// Find the location of the table
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -860,6 +847,228 @@ public class TestCompactor {
}
}
+
+ @Test
+ public void mmTable() throws Exception {
+ String dbName = "default";
+ String tblName = "mm_nonpart";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS ORC" +
+ " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+ driver);
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ msClient.close();
+
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ verifyFooBarResult(tblName, 1);
+
+ // Check that we have two deltas.
+ FileSystem fs = FileSystem.get(conf);
+ verifyDeltaCount(table.getSd(), fs, 2);
+
+ runMajorCompaction(dbName, tblName);
+ verifyFooBarResult(tblName, 1);
+ verifyHasBase(table.getSd(), fs, "base_0000002");
+
+ // Make sure we don't compact if we don't need to compact.
+ runMajorCompaction(dbName, tblName);
+ verifyFooBarResult(tblName, 1);
+ verifyHasBase(table.getSd(), fs, "base_0000002");
+ }
+
+ @Test
+ public void mmTableBucketed() throws Exception {
+ String dbName = "default";
+ String tblName = "mm_nonpart";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) CLUSTERED BY (a) " +
+ "INTO 64 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', " +
+ "'transactional_properties'='insert_only')", driver);
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ msClient.close();
+
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ verifyFooBarResult(tblName, 1);
+
+ // Check that we have two deltas.
+ FileSystem fs = FileSystem.get(conf);
+ verifyDeltaCount(table.getSd(), fs, 2);
+
+ runMajorCompaction(dbName, tblName);
+ verifyFooBarResult(tblName, 1);
+ String baseDir = "base_0000002";
+ verifyHasBase(table.getSd(), fs, baseDir);
+
+ FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
+ AcidUtils.hiddenFileFilter);
+ Assert.assertEquals(Lists.newArrayList(files).toString(), 64, files.length);
+ }
+
+ @Test
+ public void mmTableOpenWriteId() throws Exception {
+ String dbName = "default";
+ String tblName = "mm_nonpart";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" +
+ " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+ driver);
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ msClient.close();
+
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ verifyFooBarResult(tblName, 1);
+
+ long openTxnId = msClient.openTxn("test");
+ long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName);
+ Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new.
+
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ verifyFooBarResult(tblName, 2);
+
+ runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
+ FileSystem fs = FileSystem.get(conf);
+ verifyHasBase(table.getSd(), fs, "base_0000002");
+ verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
+ verifyFooBarResult(tblName, 2);
+
+ runCleaner(conf);
+ verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter);
+ verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter);
+ verifyFooBarResult(tblName, 2);
+
+ msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
+ runMajorCompaction(dbName, tblName); // Compact 4 and 5.
+ verifyFooBarResult(tblName, 2);
+ verifyHasBase(table.getSd(), fs, "base_0000005");
+ runCleaner(conf);
+ verifyDeltaCount(table.getSd(), fs, 0);
+ }
+
+ private void verifyHasBase(
+ StorageDescriptor sd, FileSystem fs, String baseName) throws Exception {
+ verifyHasDir(sd, fs, baseName, AcidUtils.baseFileFilter);
+ }
+
+ private void verifyHasDir(
+ StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) throws Exception {
+ FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+ for (FileStatus file : stat) {
+ if (name.equals(file.getPath().getName())) return;
+ }
+ Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat));
+ }
+
+ private void verifyDeltaCount(
+ StorageDescriptor sd, FileSystem fs, int count) throws Exception {
+ verifyDirCount(sd, fs, count, AcidUtils.deltaFileFilter);
+ }
+
+ private void verifyDirCount(
+ StorageDescriptor sd, FileSystem fs, int count, PathFilter filter) throws Exception {
+ FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+ Assert.assertEquals(Arrays.toString(stat), count, stat.length);
+ }
+
+ @Test
+ public void mmTablePartitioned() throws Exception {
+ String dbName = "default";
+ String tblName = "mm_part";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " PARTITIONED BY(ds int) STORED AS TEXTFILE" +
+ " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+ driver);
+
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver);
+
+ verifyFooBarResult(tblName, 3);
+
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"),
+ p2 = msClient.getPartition(dbName, tblName, "ds=2"),
+ p3 = msClient.getPartition(dbName, tblName, "ds=3");
+ msClient.close();
+
+ FileSystem fs = FileSystem.get(conf);
+ verifyDeltaCount(p1.getSd(), fs, 3);
+ verifyDeltaCount(p2.getSd(), fs, 2);
+ verifyDeltaCount(p3.getSd(), fs, 1);
+
+ runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+ verifyFooBarResult(tblName, 3);
+ verifyDeltaCount(p3.getSd(), fs, 1);
+ verifyHasBase(p1.getSd(), fs, "base_0000006");
+ verifyHasBase(p2.getSd(), fs, "base_0000006");
+
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+
+ runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+ // Make sure we don't compact if we don't need to compact; but do if we do.
+ verifyFooBarResult(tblName, 4);
+ verifyDeltaCount(p3.getSd(), fs, 1);
+ verifyHasBase(p1.getSd(), fs, "base_0000006");
+ verifyHasBase(p2.getSd(), fs, "base_0000008");
+
+ }
+
+ private void verifyFooBarResult(String tblName, int count) throws Exception, IOException {
+ List<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ executeStatementOnDriver("SELECT a,b FROM " + tblName, driver);
+ driver.getResults(valuesReadFromHiveDriver);
+ Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size());
+ int fooCount = 0, barCount = 0;
+ for (String s : valuesReadFromHiveDriver) {
+ if ("1\tfoo".equals(s)) {
+ ++fooCount;
+ } else if ("2\tbar".equals(s)) {
+ ++barCount;
+ } else {
+ Assert.fail("Unexpected " + s);
+ }
+ }
+ Assert.assertEquals(fooCount, count);
+ Assert.assertEquals(barCount, count);
+ }
+
+ private void runMajorCompaction(
+ String dbName, String tblName, String... partNames) throws MetaException {
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(conf);
+ t.init(new AtomicBoolean(true), new AtomicBoolean());
+ if (partNames.length == 0) {
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+ t.run();
+ } else {
+ for (String partName : partNames) {
+ CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+ cr.setPartitionname(partName);
+ txnHandler.compact(cr);
+ t.run();
+ }
+ }
+ }
+
@Test
public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
String dbName = "default";
@@ -885,16 +1094,7 @@ public class TestCompactor {
// Start a third batch, but don't close it.
writeBatch(connection, writer, true);
- // Now, compact
- TxnStore txnHandler = TxnUtils.getTxnStore(conf);
- txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
- Worker t = new Worker();
- t.setThreadId((int) t.getId());
- t.setConf(conf);
- AtomicBoolean stop = new AtomicBoolean(true);
- AtomicBoolean looped = new AtomicBoolean();
- t.init(stop, looped);
- t.run();
+ runMajorCompaction(dbName, tblName);
// Find the location of the table
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -1443,6 +1643,7 @@ public class TestCompactor {
throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
}
}
+
static void createTestDataFile(String filename, String[] lines) throws IOException {
FileWriter writer = null;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e8feeb8..e8c9ea0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -46,6 +46,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -209,6 +213,7 @@ public class Driver implements IDriver {
private CacheUsage cacheUsage;
private CacheEntry usedCacheEntry;
+ private ValidWriteIdList compactionWriteIds = null;
private enum DriverState {
INITIALIZED,
@@ -541,8 +546,10 @@ public class Driver implements IDriver {
conf.setQueryString(queryStr);
// FIXME: sideeffect will leave the last query set at the session level
- SessionState.get().getConf().setQueryString(queryStr);
- SessionState.get().setupQueryCurrentTimestamp();
+ if (SessionState.get() != null) {
+ SessionState.get().getConf().setQueryString(queryStr);
+ SessionState.get().setupQueryCurrentTimestamp();
+ }
// Whether any error occurred during query compilation. Used for query lifetime hook.
boolean compileError = false;
@@ -1309,7 +1316,18 @@ public class Driver implements IDriver {
throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
}
- ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString);
+ List<String> txnTables = getTransactionalTableList(plan);
+ ValidTxnWriteIdList txnWriteIds = null;
+ if (compactionWriteIds != null) {
+ if (txnTables.size() != 1) {
+ throw new LockException("Unexpected tables in compaction: " + txnTables);
+ }
+ String fullTableName = txnTables.get(0);
+ txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now.
+ txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
+ } else {
+ txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+ }
String writeIdStr = txnWriteIds.toString();
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
if (plan.getFetchTask() != null) {
@@ -2773,4 +2791,8 @@ public class Driver implements IDriver {
return false;
}
}
+
+ public void setCompactionWriteIds(ValidWriteIdList val) {
+ this.compactionWriteIds = val;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5b8a120..d6f5666 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2724,7 +2724,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
return prop_string;
}
- private StringBuilder appendSerdeParams(StringBuilder builder, Map<String, String> serdeParam) {
+ public static StringBuilder appendSerdeParams(
+ StringBuilder builder, Map<String, String> serdeParam) {
serdeParam = new TreeMap<String, String>(serdeParam);
builder.append("WITH SERDEPROPERTIES ( \n");
List<String> serdeCols = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 7b7fd5d..76569d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.lockmgr;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -31,12 +33,14 @@ import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -69,7 +73,7 @@ import java.util.concurrent.atomic.AtomicReference;
* with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
* The later may (usually will) be called from a timer thread.
* See {@link #getMS()} for more important concurrency/metastore access notes.
- *
+ *
* Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
* Effectively, that means any statement that has side effects. Exceptions are statements like
* Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either
@@ -111,9 +115,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
/**
* if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
* include any Operations which cannot be rolled back (drop partition; write to non-acid table).
- * If false, it's a single statement transaction which can include any statement. This is not a
+ * If false, it's a single statement transaction which can include any statement. This is not a
* contradiction from the user point of view who doesn't know anything about the implicit txn
- * and cannot call rollback (the statement of course can fail in which case there is nothing to
+ * and cannot call rollback (the statement of course can fail in which case there is nothing to
* rollback (assuming the statement is well implemented)).
*
* This is done so that all commands run in a transaction which simplifies implementation and
@@ -292,7 +296,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
/**
* Ensures that the current SQL statement is appropriate for the current state of the
* Transaction Manager (e.g. can call commit unless you called start transaction)
- *
+ *
* Note that support for multi-statement txns is a work-in-progress so it's only supported in
* HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
* @param queryPlan
@@ -300,7 +304,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
*/
private void verifyState(QueryPlan queryPlan) throws LockException {
if(!isTxnOpen()) {
- throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
+ throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
" for " + getQueryIdWaterMark(queryPlan));
}
if(queryPlan.getOperation() == null) {
@@ -820,7 +824,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
assert isTxnOpen();
assert validTxnList != null && !validTxnList.isEmpty();
try {
- return getMS().getValidWriteIds(txnId, tableList, validTxnList);
+ return TxnUtils.createValidTxnWriteIdList(
+ txnId, getMS().getValidWriteIds(tableList, validTxnList));
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
}
@@ -1013,7 +1018,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
}
}
-
+
public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 78eedd3..a74670b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -394,6 +394,4 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
return locks;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index ec11fec..f239535 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+
import java.util.List;
/**
@@ -253,7 +254,7 @@ public interface HiveTxnManager {
boolean recordSnapshot(QueryPlan queryPlan);
boolean isImplicitTransactionOpen();
-
+
boolean isTxnOpen();
/**
* if {@code isTxnOpen()}, returns the currently active transaction ID.
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index dde20ed..056dfa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -110,6 +110,9 @@ public final class PlanUtils {
public static TableDesc getDefaultTableDesc(CreateTableDesc directoryDesc,
String cols, String colTypes ) {
+ // TODO: this should have an option for directory to inherit from the parent table,
+ // including bucketing and list bucketing, for the use in compaction when the
+ // latter runs inside a transaction.
TableDesc ret = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols,
colTypes, false);;
if (directoryDesc == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b1c2288..b698c84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,13 +17,21 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+
+import com.google.common.collect.Lists;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
@@ -33,24 +41,43 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -70,7 +97,9 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,7 +157,7 @@ public class CompactorMR {
}
job.set(FINAL_LOCATION, sd.getLocation());
- job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
+ job.set(TMP_LOCATION, generateTmpPath(sd));
job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
job.setBoolean(IS_COMPRESSED, sd.isCompressed());
@@ -200,19 +229,17 @@ public class CompactorMR {
* @param ci CompactionInfo
* @throws java.io.IOException if the job fails
*/
- void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds,
+ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
}
- // For MM tables we don't need to launch MR jobs as there is no compaction needed.
- // We just need to delete the directories for aborted transactions.
if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
- LOG.debug("Going to delete directories for aborted transactions for MM table "
- + t.getDbName() + "." + t.getTableName());
- removeFiles(conf, sd.getLocation(), writeIds, t);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+ runMmCompaction(conf, t, p, sd, writeIds, ci);
+ }
return;
}
@@ -294,6 +321,255 @@ public class CompactorMR {
su.gatherStats();
}
+ private void runMmCompaction(HiveConf conf, Table t, Partition p,
+ StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
+ LOG.debug("Going to delete directories for aborted transactions for MM table "
+ + t.getDbName() + "." + t.getTableName());
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
+ conf, writeIds, Ref.from(false), false, t.getParameters());
+ removeFilesForMmTable(conf, dir);
+
+ // Then, actually do the compaction.
+ if (!ci.isMajorCompaction()) {
+ // Not supported for MM tables right now.
+ LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction");
+ return;
+ }
+
+ int deltaCount = dir.getCurrentDirectories().size();
+ if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) {
+ LOG.debug("Not compacting " + sd.getLocation() + "; current base is "
+ + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas");
+ return;
+ }
+ try {
+ String tmpLocation = generateTmpPath(sd);
+ Path baseLocation = new Path(tmpLocation, "_base");
+
+ // Set up the session for driver.
+ conf = new HiveConf(conf);
+ conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ SessionState sessionState = setUpSessionState(conf, user);
+
+ // Note: we could skip creating the table and just add table type stuff directly to the
+ // "insert overwrite directory" command if there were no bucketing or list bucketing.
+ String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName;
+ while (true) {
+ tmpTableName = tmpPrefix + System.currentTimeMillis();
+ String query = buildMmCompactionCtQuery(tmpTableName, t,
+ p == null ? t.getSd() : p.getSd(), baseLocation.toString());
+ LOG.info("Compacting a MM table into " + query);
+ try {
+ runOnDriver(conf, user, sessionState, query, null);
+ break;
+ } catch (Exception ex) {
+ Throwable cause = ex;
+ while (cause != null && !(cause instanceof AlreadyExistsException)) {
+ cause = cause.getCause();
+ }
+ if (cause == null) {
+ throw new IOException(ex);
+ }
+ }
+ }
+
+ String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
+ LOG.info("Compacting a MM table via " + query);
+ runOnDriver(conf, user, sessionState, query, writeIds);
+ commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
+ runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null);
+ } catch (HiveException e) {
+ LOG.error("Error compacting a MM table", e);
+ throw new IOException(e);
+ }
+ }
+
+ public SessionState setUpSessionState(HiveConf conf, String user) {
+ SessionState sessionState = SessionState.get();
+ if (sessionState == null) {
+ // Note: we assume that workers run on the same threads repeatedly, so we can set up
+ // the session here and it will be reused without explicitly storing in the worker.
+ sessionState = new SessionState(conf, user);
+ SessionState.setCurrentSessionState(sessionState);
+ }
+ return sessionState;
+ }
+
+ private String generateTmpPath(StorageDescriptor sd) {
+ return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+ }
+
+ private String buildMmCompactionCtQuery(
+ String fullName, Table t, StorageDescriptor sd, String location) {
+ StringBuilder query = new StringBuilder("create temporary table ")
+ .append(fullName).append("(");
+ List<FieldSchema> cols = t.getSd().getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(col.getType());
+ }
+ query.append(") ");
+
+ // Bucketing.
+ List<String> buckCols = t.getSd().getBucketCols();
+ if (buckCols.size() > 0) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+ List<Order> sortCols = t.getSd().getSortCols();
+ if (sortCols.size() > 0) {
+ query.append("SORTED BY (");
+ List<String> sortKeys = new ArrayList<String>();
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append(" ");
+ if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) {
+ query.append("ASC");
+ } else if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) {
+ query.append("DESC");
+ }
+ }
+ query.append(") ");
+ }
+ query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+
+ // Stored as directories. We don't care about the skew otherwise.
+ if (t.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY (").append(
+ StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+ isFirst = true;
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join("','", colValues)).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(
+ serdeInfo.getSerializationLib())).append("'");
+ String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ assert sh == null; // Not supposed to be a compactable table.
+ if (!serdeParams.isEmpty()) {
+ DDLTask.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '").append(
+ HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append(
+ HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append(
+ HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+ // Exclude all standard table properties.
+ Set<String> excludes = getHiveMetastoreConstants();
+ excludes.addAll(Lists.newArrayList(StatsSetupConst.TABLE_PARAMS_STATS_KEYS));
+ isFirst = true;
+ for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
+ if (e.getValue() == null) continue;
+ if (excludes.contains(e.getKey())) continue;
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("'").append(e.getKey()).append("'='").append(
+ HiveStringUtils.escapeHiveCommand(e.getValue())).append("'");
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ query.append("'transactional'='false')");
+ return query.toString();
+
+ }
+
+ private static Set<String> getHiveMetastoreConstants() {
+ HashSet<String> result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isStatic(f.getModifiers())) continue;
+ if (!Modifier.isFinal(f.getModifiers())) continue;
+ if (!String.class.equals(f.getType())) continue;
+ f.setAccessible(true);
+ try {
+ result.add((String)f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ private void runOnDriver(HiveConf conf, String user,
+ SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+ boolean isOk = false;
+ try {
+ QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
+ Driver driver = new Driver(qs, user, null, null);
+ driver.setCompactionWriteIds(writeIds);
+ try {
+ CommandProcessorResponse cpr = driver.run(query);
+ if (cpr.getResponseCode() != 0) {
+ LOG.error("Failed to run " + query, cpr.getException());
+ throw new HiveException("Failed to run " + query, cpr.getException());
+ }
+ } finally {
+ driver.close();
+ driver.destroy();
+ }
+ isOk = true;
+ } finally {
+ if (!isOk) {
+ try {
+ sessionState.close(); // This also resets SessionState.get.
+ } catch (Throwable th) {
+ LOG.warn("Failed to close a bad session", th);
+ SessionState.detachSession();
+ }
+ }
+ }
+ }
+
+ private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
+ String fullName = t.getDbName() + "." + t.getTableName();
+ // TODO: ideally we should make a special form of insert overwrite so that we:
+ // 1) Could use fast merge path for ORC and RC.
+ // 2) Didn't have to create a table.
+
+ String query = "insert overwrite table " + tmpName + " ";
+ String filter = "";
+ if (p != null) {
+ filter = " where ";
+ List<String> vals = p.getValues();
+ List<FieldSchema> keys = t.getPartitionKeys();
+ assert keys.size() == vals.size();
+ for (int i = 0; i < keys.size(); ++i) {
+ filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
+ }
+ query += " select ";
+ // Use table descriptor for columns.
+ List<FieldSchema> cols = t.getSd().getCols();
+ for (int i = 0; i < cols.size(); ++i) {
+ query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`");
+ }
+ } else {
+ query += "select *";
+ }
+ query += " from " + fullName + filter;
+ return query;
+ }
+
/**
* @param baseDir if not null, it's either table/partition root folder or base_xxxx.
* If it's base_xxxx, it's in dirsToSearch, else the actual original files
@@ -309,6 +585,10 @@ public class CompactorMR {
dirsToSearch = new StringableList();
}
StringableList deltaDirs = new StringableList();
+ // Note: if compaction creates a delta, it won't replace an existing base dir, so the txn ID
+ // of the base dir won't be a part of delta's range. If otoh compaction creates a base,
+ // we don't care about this value because bases don't have min txn ID in the name.
+ // However logically this should also take base into account if it's included.
long minTxn = Long.MAX_VALUE;
long maxTxn = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta delta : parsedDeltas) {
@@ -356,7 +636,7 @@ public class CompactorMR {
* to use.
* @param job the job to update
* @param cols the columns of the table
- * @param map
+ * @param map
*/
private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
StringBuilder colNames = new StringBuilder();
@@ -378,10 +658,7 @@ public class CompactorMR {
}
// Remove the directories for aborted transactions only
- private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t)
- throws IOException {
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList,
- Ref.from(false), false, t.getParameters());
+ private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
// For MM table, we only want to delete delta dirs for aborted txns.
List<FileStatus> abortedDirs = dir.getAbortedDirectories();
List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
@@ -389,11 +666,9 @@ public class CompactorMR {
filesToDelete.add(stat.getPath());
}
if (filesToDelete.size() < 1) {
- LOG.warn("Hmm, nothing to delete in the worker for directory " + location +
- ", that hardly seems right.");
return;
}
- LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location);
+ LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
LOG.debug("Going to delete path " + dead.toString());
@@ -940,6 +1215,7 @@ public class CompactorMR {
FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
//we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
//name is that we want to rename; leave it for another day
+ // TODO: if we expect one dir why don't we enforce it?
for (FileStatus fileStatus : contents) {
//newPath is the base/delta dir
Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -969,4 +1245,35 @@ public class CompactorMR {
fs.delete(tmpLocation, true);
}
}
+
+ /**
+ * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+ * @param from The temp directory used for compactor output. Not the actual base/delta.
+ * @param to The final directory; basically a SD directory. Not the actual base/delta.
+ */
+ private void commitMmCompaction(String from, String to, Configuration conf,
+ ValidWriteIdList actualWriteIds) throws IOException {
+ Path fromPath = new Path(from), toPath = new Path(to);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
+ fs.mkdirs(newBaseDir);
+ AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs);
+ return;
+ }
+ LOG.info("Moving contents of " + from + " to " + to);
+ FileStatus[] children = fs.listStatus(fromPath);
+ if (children.length != 1) {
+ throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
+ }
+ FileStatus dirPath = children[0];
+ fs.rename(dirPath.getPath(), newBaseDir);
+ AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs);
+ fs.delete(fromPath, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c95daaf..a61b6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -240,11 +240,6 @@ public class Initiator extends CompactorThread {
return CompactionType.MAJOR;
}
- // If it is for insert-only transactional table, return null.
- if (AcidUtils.isInsertOnlyTable(tblproperties)) {
- return null;
- }
-
if (runJobAsSelf(runAs)) {
return determineCompactionType(ci, writeIds, sd, tblproperties);
} else {
@@ -333,14 +328,20 @@ public class Initiator extends CompactorThread {
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
Integer.parseInt(deltaNumProp);
boolean enough = deltas.size() > deltaNumThreshold;
- if (enough) {
- LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
- (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
- " compaction");
- // If there's no base file, do a major compaction
- return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
+ if (!enough) {
+ return null;
+ }
+ if (AcidUtils.isInsertOnlyTable(tblproperties)) {
+ LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size()
+ + " delta files, threshold is " + deltaNumThreshold);
+ return CompactionType.MAJOR;
}
- return null;
+ // TODO: this log statement looks wrong
+ LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
+ (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
+ " compaction");
+ // If there's no base file, do a major compaction
+ return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
}
private long sumDirSize(FileSystem fs, Path dir) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index fe0aaa4..7461299 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -84,6 +84,7 @@ public class Worker extends CompactorThread {
// so wrap it in a big catch Throwable statement.
try {
final CompactionInfo ci = txnHandler.findNextToCompact(name);
+ LOG.debug("Processing compaction request " + ci);
if (ci == null && !stop.get()) {
try {
@@ -170,14 +171,15 @@ public class Worker extends CompactorThread {
launchedJob = true;
try {
if (runJobAsSelf(runAs)) {
- mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+ mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
+ final Partition fp = p;
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+ mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index c053860..f357275 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -200,32 +201,7 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
}
- // 2. Perform a major compaction.
- runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 delta dirs.
- Assert.assertEquals(2, status.length);
- boolean sawBase = false;
- int deltaCount = 0;
- for (int i = 0; i < status.length; i++) {
- String dirName = status[i].getPath().getName();
- if (dirName.matches("delta_.*")) {
- deltaCount++;
- } else {
- sawBase = true;
- Assert.assertTrue(dirName.matches("base_.*"));
- }
- }
- Assert.assertEquals(2, deltaCount);
- Assert.assertFalse(sawBase);
- // Verify query result
- int [][] resultData = new int[][] {{1,2},{3,4}};
- List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(resultData), rs);
-
- // 3. INSERT OVERWRITE
+ // 2. INSERT OVERWRITE
// Prepare data for the source table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
// Insert overwrite MM table from source table
@@ -235,40 +211,12 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
// There should be 2 delta dirs, plus 1 base dir in the location
Assert.assertEquals(3, status.length);
int baseCount = 0;
- deltaCount = 0;
- for (int i = 0; i < status.length; i++) {
- String dirName = status[i].getPath().getName();
- if (dirName.matches("delta_.*")) {
- deltaCount++;
- } else {
- baseCount++;
- }
- }
- Assert.assertEquals(2, deltaCount);
- Assert.assertEquals(1, baseCount);
-
- // Verify query result
- resultData = new int[][] {{5,6},{7,8}};
- rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(resultData), rs);
-
- // 4. Perform a minor compaction. Nothing should change.
- // Both deltas and the base dir should have the same name.
- // Re-verify directory layout and query result by using the same logic as above
- runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
- runWorker(hiveConf);
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 delta dirs, plus 1 base dir in the location
- Assert.assertEquals(3, status.length);
- baseCount = 0;
- deltaCount = 0;
+ int deltaCount = 0;
for (int i = 0; i < status.length; i++) {
String dirName = status[i].getPath().getName();
if (dirName.matches("delta_.*")) {
deltaCount++;
} else {
- Assert.assertTrue(dirName.matches("base_.*"));
baseCount++;
}
}
@@ -276,19 +224,8 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
Assert.assertEquals(1, baseCount);
// Verify query result
- rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(resultData), rs);
-
- // 5. Run Cleaner. It should remove the 2 delta dirs.
- runCleaner(hiveConf);
- // There should be only 1 directory left: base_xxxxxxx.
- // The delta dirs should have been cleaned up.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, status.length);
- Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
- // Verify query result
- rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+ int[][] resultData = new int[][] {{5,6},{7,8}};
+ List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
Assert.assertEquals(stringifyValues(resultData), rs);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index cb1d40a..2b35e6f 100644
--- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -6,10 +6,11 @@
*/
package org.apache.hadoop.hive.metastore.api;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Field;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
-
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
@@ -34,7 +35,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@org.apache.hadoop.classification.InterfaceAudience.Public @org.apache.hadoop.classification.InterfaceStability.Stable public class hive_metastoreConstants {
+@org.apache.hadoop.classification.InterfaceAudience.Public
+@org.apache.hadoop.classification.InterfaceStability.Stable
+public class hive_metastoreConstants {
public static final String DDL_TIME = "transient_lastDdlTime";
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 5a0cc6a..c766f10 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2392,10 +2392,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
- public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
- throws TException {
+ public List<TableValidWriteIds> getValidWriteIds(
+ List<String> tablesList, String validTxnList) throws TException {
GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
- return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+ return client.get_valid_write_ids(rqst).getTblValidWriteIds();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index aee416d..72b814d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
@@ -2749,14 +2750,13 @@ public interface IMetaStoreClient {
/**
* Get a structure that details valid write ids list for all tables read by current txn.
- * @param currentTxnId current txn ID for which we try to get valid write ids list
* @param tablesList list of tables (format: <db_name>.<table_name>) read from the current transaction
* for which needs to populate the valid write ids
* @param validTxnList snapshot of valid txns for the current txn
* @return list of valid write ids for the given list of tables.
* @throws TException
*/
- ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+ List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
throws TException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 7b02865..1880d44 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
@@ -80,13 +81,13 @@ public class TxnUtils {
* {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to
* read the files, and thus treats both open and aborted transactions as invalid.
* @param currentTxnId current txn ID for which we get the valid write ids list
- * @param validWriteIds valid write ids list from the metastore
+ * @param list valid write ids list from the metastore
* @return a valid write IDs list for the whole transaction.
*/
public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
- GetValidWriteIdsResponse validWriteIds) {
+ List<TableValidWriteIds> validIds) {
ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
- for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) {
+ for (TableValidWriteIds tableWriteIds : validIds) {
validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
}
return validTxnWriteIdList;
@@ -155,6 +156,17 @@ public class TxnUtils {
}
}
+ public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
+ // This is based on the existing valid write ID list that was built for a select query;
+ // therefore we assume all the aborted txns, etc. were already accounted for.
+ // All we do is adjust the high watermark to only include contiguous txns.
+ Long minOpenWriteId = ids.getMinOpenWriteId();
+ if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
+ return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
+ }
+ return ids;
+ }
+
/**
* Get an instance of the TxnStore that is appropriate for this store
* @param conf configuration
@@ -212,7 +224,7 @@ public class TxnUtils {
/**
- * Build a query (or queries if one query is too big but only for the case of 'IN'
+ * Build a query (or queries if one query is too big but only for the case of 'IN'
* composite clause. For the case of 'NOT IN' clauses, multiple queries change
* the semantics of the intended query.
* E.g., Let's assume that input "inList" parameter has [5, 6] and that
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index bf87cfc..8ae899f 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2140,10 +2140,10 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
}
@Override
- public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+ public List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
throws TException {
GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
- return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+ return client.get_valid_write_ids(rqst).getTblValidWriteIds();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 107ea90..95a0b56 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -250,5 +250,9 @@ public class ValidReaderWriteIdList implements ValidWriteIdList {
return RangeResponse.SOME;
}
}
+
+ public ValidReaderWriteIdList updateHighWatermark(long value) {
+ return new ValidReaderWriteIdList(tableName, exceptions, abortedBits, value, minOpenWriteId);
+ }
}