You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/01/11 20:54:45 UTC
hive git commit: HIVE-20960 Make MM compactor run in a transaction
and remove CompactorMR.createCompactorMarker() (Eugene Koifman,
reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master f713140ba -> cb9d5ccd8
HIVE-20960 Make MM compactor run in a transaction and remove CompactorMR.createCompactorMarker() (Eugene Koifman, reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb9d5ccd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb9d5ccd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb9d5ccd
Branch: refs/heads/master
Commit: cb9d5ccd87997985b4623369247aead32bd91390
Parents: f713140
Author: Eugene Koifman <ek...@apache.org>
Authored: Fri Jan 11 12:54:28 2019 -0800
Committer: Eugene Koifman <ek...@apache.org>
Committed: Fri Jan 11 12:54:28 2019 -0800
----------------------------------------------------------------------
.../hive/ql/txn/compactor/TestCompactor.java | 76 ++++++++++++-------
.../java/org/apache/hadoop/hive/ql/Driver.java | 13 +++-
.../org/apache/hadoop/hive/ql/DriverUtils.java | 18 ++++-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 78 ++++++++++----------
.../hive/ql/stats/StatsUpdaterThread.java | 2 +-
.../hive/ql/txn/compactor/CompactorMR.java | 59 +++++++--------
.../hadoop/hive/ql/txn/compactor/Worker.java | 7 ++
.../hadoop/hive/ql/TestTxnConcatenate.java | 28 +++----
.../hive/ql/stats/TestStatsUpdaterThread.java | 2 +-
9 files changed, 160 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 2b22a62..dc7b287 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -97,12 +96,10 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@RunWith(Parameterized.class)
public class TestCompactor {
private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class);
@@ -117,12 +114,6 @@ public class TestCompactor {
return Arrays.asList(new Object[][]{{true}, {false}});
}
- private boolean newStreamingAPI;
-
- public TestCompactor(boolean newStreamingAPI) {
- this.newStreamingAPI = newStreamingAPI;
- }
-
@Rule
public TemporaryFolder stagingFolder = new TemporaryFolder();
@@ -366,6 +357,13 @@ public class TestCompactor {
*/
@Test
public void testStatsAfterCompactionPartTbl() throws Exception {
+ testStatsAfterCompactionPartTbl(false);
+ }
+ @Test
+ public void testStatsAfterCompactionPartTblNew() throws Exception {
+ testStatsAfterCompactionPartTbl(true);
+ }
+ private void testStatsAfterCompactionPartTbl(boolean newStreamingAPI) throws Exception {
//as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat
String tblName = "compaction_test";
String tblNameStg = tblName + "_stg";
@@ -710,6 +708,13 @@ public class TestCompactor {
@Test
public void minorCompactAfterAbort() throws Exception {
+ minorCompactAfterAbort(false);
+ }
+ @Test
+ public void minorCompactAfterAbortNew() throws Exception {
+ minorCompactAfterAbort(true);
+ }
+ private void minorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
String dbName = "default";
String tblName = "cws";
String columnNamesProperty = "a,b";
@@ -719,7 +724,7 @@ public class TestCompactor {
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
- processStreamingAPI(dbName, tblName);
+ processStreamingAPI(dbName, tblName, newStreamingAPI);
// Now, compact
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
@@ -750,6 +755,13 @@ public class TestCompactor {
@Test
public void majorCompactAfterAbort() throws Exception {
+ majorCompactAfterAbort(false);
+ }
+ @Test
+ public void majorCompactAfterAbortNew() throws Exception {
+ majorCompactAfterAbort(true);
+ }
+ private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
String dbName = "default";
String tblName = "cws";
String columnNamesProperty = "a,b";
@@ -759,7 +771,7 @@ public class TestCompactor {
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
" STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
- processStreamingAPI(dbName, tblName);
+ processStreamingAPI(dbName, tblName, newStreamingAPI);
runMajorCompaction(dbName, tblName);
// Find the location of the table
@@ -805,12 +817,12 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- verifyHasBase(table.getSd(), fs, "base_0000002");
+ verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
// Make sure we don't compact if we don't need to compact.
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- verifyHasBase(table.getSd(), fs, "base_0000002");
+ verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
}
@Test
@@ -849,7 +861,7 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 3);
- verifyHasBase(table.getSd(), fs, "base_0000001");
+ verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
// Try with an extra delta.
executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -875,7 +887,7 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 9);
- verifyHasBase(table.getSd(), fs, "base_0000002");
+ verifyHasBase(table.getSd(), fs, "base_0000002_v0000023");
// Try with an extra base.
executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -926,7 +938,7 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 1);
- String baseDir = "base_0000002";
+ String baseDir = "base_0000002_v0000006";
verifyHasBase(table.getSd(), fs, baseDir);
FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
@@ -962,7 +974,7 @@ public class TestCompactor {
runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
FileSystem fs = FileSystem.get(conf);
- verifyHasBase(table.getSd(), fs, "base_0000002");
+ verifyHasBase(table.getSd(), fs, "base_0000002_v0000010");
verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
verifyFooBarResult(tblName, 2);
@@ -974,7 +986,7 @@ public class TestCompactor {
msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
runMajorCompaction(dbName, tblName); // Compact 4 and 5.
verifyFooBarResult(tblName, 2);
- verifyHasBase(table.getSd(), fs, "base_0000005");
+ verifyHasBase(table.getSd(), fs, "base_0000005_v0000016");
runCleaner(conf);
verifyDeltaCount(table.getSd(), fs, 0);
}
@@ -1038,8 +1050,8 @@ public class TestCompactor {
verifyFooBarResult(tblName, 3);
verifyDeltaCount(p3.getSd(), fs, 1);
- verifyHasBase(p1.getSd(), fs, "base_0000006");
- verifyHasBase(p2.getSd(), fs, "base_0000006");
+ verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
+ verifyHasBase(p2.getSd(), fs, "base_0000006_v0000014");
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
@@ -1049,8 +1061,8 @@ public class TestCompactor {
// Make sure we don't compact if we don't need to compact; but do if we do.
verifyFooBarResult(tblName, 4);
verifyDeltaCount(p3.getSd(), fs, 1);
- verifyHasBase(p1.getSd(), fs, "base_0000006");
- verifyHasBase(p2.getSd(), fs, "base_0000008");
+ verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
+ verifyHasBase(p2.getSd(), fs, "base_0000008_v0000023");
}
@@ -1095,6 +1107,13 @@ public class TestCompactor {
@Test
public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
+ majorCompactWhileStreamingForSplitUpdate(false);
+ }
+ @Test
+ public void majorCompactWhileStreamingForSplitUpdateNew() throws Exception {
+ majorCompactWhileStreamingForSplitUpdate(true);
+ }
+ private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) throws Exception {
String dbName = "default";
String tblName = "cws";
String columnNamesProperty = "a,b";
@@ -1283,6 +1302,13 @@ public class TestCompactor {
@Test
public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
+ minorCompactWhileStreamingWithSplitUpdate(true);
+ }
+ @Test
+ public void minorCompactWhileStreamingWithSplitUpdateNew() throws Exception {
+ minorCompactWhileStreamingWithSplitUpdate(true);
+ }
+ private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI) throws Exception {
String dbName = "default";
String tblName = "cws";
String columnNamesProperty = "a,b";
@@ -1713,9 +1739,9 @@ public class TestCompactor {
}
- private void processStreamingAPI(String dbName, String tblName)
- throws StreamingException, ClassNotFoundException, org.apache.hive.hcatalog.streaming.StreamingException,
- InterruptedException {
+ private void processStreamingAPI(String dbName, String tblName, boolean newStreamingAPI)
+ throws StreamingException, ClassNotFoundException,
+ org.apache.hive.hcatalog.streaming.StreamingException, InterruptedException {
if (newStreamingAPI) {
StreamingConnection connection = null;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 608cbd5..01ecf0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -223,6 +223,7 @@ public class Driver implements IDriver {
private CacheUsage cacheUsage;
private CacheEntry usedCacheEntry;
private ValidWriteIdList compactionWriteIds = null;
+ private long compactorTxnId = 0;
private Context backupContext = null;
private boolean retrial = false;
@@ -1481,11 +1482,18 @@ public class Driver implements IDriver {
List<String> txnTables = getTransactionalTableList(plan);
ValidTxnWriteIdList txnWriteIds = null;
if (compactionWriteIds != null) {
+ /**
+ * This is kludgy: here we need to read with Compactor's snapshot/txn
+ * rather than the snapshot of the current {@code txnMgr}, in effect
+ * simulating a "flashback query" but can't actually share compactor's
+ * txn since it would run multiple statements. See more comments in
+ * {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} where it start
+ * the compactor txn*/
if (txnTables.size() != 1) {
throw new LockException("Unexpected tables in compaction: " + txnTables);
}
String fullTableName = txnTables.get(0);
- txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now. todo: Since MM compaction is a query, a txn has been opened at this point
+ txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
} else {
txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
@@ -3015,7 +3023,8 @@ public class Driver implements IDriver {
}
}
- public void setCompactionWriteIds(ValidWriteIdList val) {
+ public void setCompactionWriteIds(ValidWriteIdList val, long compactorTxnId) {
this.compactionWriteIds = val;
+ this.compactorTxnId = compactorTxnId;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
index 8228109..a2c6862 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -28,14 +29,27 @@ import org.slf4j.LoggerFactory;
public class DriverUtils {
private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
+ public static void runOnDriver(HiveConf conf, String user, SessionState sessionState,
+ String query) throws HiveException {
+ runOnDriver(conf, user, sessionState, query, null, -1);
+ }
+
+ /**
+ * For Query Based compaction to run the query to generate the compacted data.
+ */
public static void runOnDriver(HiveConf conf, String user,
- SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+ SessionState sessionState, String query, ValidWriteIdList writeIds, long compactorTxnId)
+ throws HiveException {
+ if(writeIds != null && compactorTxnId < 0) {
+ throw new IllegalArgumentException(JavaUtils.txnIdToString(compactorTxnId) +
+ " is not valid. Context: " + query);
+ }
SessionState.setCurrentSessionState(sessionState);
boolean isOk = false;
try {
QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
Driver driver = new Driver(qs, user, null, null);
- driver.setCompactionWriteIds(writeIds);
+ driver.setCompactionWriteIds(writeIds, compactorTxnId);
try {
CommandProcessorResponse cpr = driver.run(query);
if (cpr.getResponseCode() != 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index badcc55..5dbf634 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -27,7 +27,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -774,12 +773,14 @@ public class AcidUtils {
public static final class ParsedBase {
private final long writeId;
private final long visibilityTxnId;
- ParsedBase(long writeId) {
- this(writeId, 0);
+ private final Path baseDirPath;
+ ParsedBase(long writeId, Path baseDirPath) {
+ this(writeId, 0, baseDirPath);
}
- ParsedBase(long writeId, long visibilityTxnId) {
+ ParsedBase(long writeId, long visibilityTxnId, Path baseDirPath) {
this.writeId = writeId;
this.visibilityTxnId = visibilityTxnId;
+ this.baseDirPath = baseDirPath;
}
public long getWriteId() {
return writeId;
@@ -787,6 +788,9 @@ public class AcidUtils {
public long getVisibilityTxnId() {
return visibilityTxnId;
}
+ public Path getBaseDirPath() {
+ return baseDirPath;
+ }
public static ParsedBase parseBase(Path path) {
String filename = path.getName();
if(!filename.startsWith(BASE_PREFIX)) {
@@ -794,10 +798,10 @@ public class AcidUtils {
}
int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
if(idxOfv < 0) {
- return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())));
+ return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
}
return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
- Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())));
+ Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path);
}
}
/**
@@ -1229,7 +1233,7 @@ public class AcidUtils {
}
if(bestBase.oldestBase != null && bestBase.status == null &&
- MetaDataFile.isCompacted(bestBase.oldestBase, fs)) {
+ isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs)) {
/**
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
* {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus
@@ -1276,23 +1280,33 @@ public class AcidUtils {
* Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other
* files within the snapshot.
* A base produced by Insert Overwrite is different. Logically it's a delta file but one that
- * causes anything written previously is ignored (hence the overwrite). In this case, base_x
+ * causes anything written previously to be ignored (hence the overwrite). In this case, base_x
* is visible if writeid:x is committed for current reader.
*/
- private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir,
+ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList,
FileSystem fs) throws IOException {
- if(baseWriteId == Long.MIN_VALUE) {
+ if(parsedBase.getWriteId() == Long.MIN_VALUE) {
//such base is created by 1st compaction in case of non-acid to acid table conversion
//By definition there are no open txns with id < 1.
return true;
}
- if(!MetaDataFile.isCompacted(baseDir, fs)) {
- //this is the IOW case
- return writeIdList.isWriteIdValid(baseWriteId);
+ if(isCompactedBase(parsedBase, fs)) {
+ return writeIdList.isValidBase(parsedBase.getWriteId());
}
- return writeIdList.isValidBase(baseWriteId);
+ //if here, it's a result of IOW
+ return writeIdList.isWriteIdValid(parsedBase.getWriteId());
}
+ /**
+ * Returns {@code true} if {@code parsedBase} was created by compaction.
+ * As of Hive 4.0 we can tell if a directory is a result of compaction based on the
+ * presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix. Base directories written prior to
+ * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first
+ * since that is the cheaper test.*/
+ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException {
+ return parsedBase.getVisibilityTxnId() > 0 ||
+ MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
+ }
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
@@ -1307,23 +1321,23 @@ public class AcidUtils {
return;
}
if (fn.startsWith(BASE_PREFIX)) {
- ParsedBase parsedBase = ParsedBase.parseBase(p);
+ ParsedBase parsedBase = ParsedBase.parseBase(p);
if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
return;
}
- long writeId = parsedBase.getWriteId();
+ final long writeId = parsedBase.getWriteId();
if(bestBase.oldestBaseWriteId > writeId) {
//keep track for error reporting
bestBase.oldestBase = p;
bestBase.oldestBaseWriteId = writeId;
}
if (bestBase.status == null) {
- if(isValidBase(writeId, writeIdList, p, fs)) {
+ if(isValidBase(parsedBase, writeIdList, fs)) {
bestBase.status = child;
bestBase.writeId = writeId;
}
} else if (bestBase.writeId < writeId) {
- if(isValidBase(writeId, writeIdList, p, fs)) {
+ if(isValidBase(parsedBase, writeIdList, fs)) {
obsolete.add(bestBase.status);
bestBase.status = child;
bestBase.writeId = writeId;
@@ -1943,31 +1957,13 @@ public class AcidUtils {
String COMPACTED = "compacted";
}
- /**
- * @param baseOrDeltaDir detla or base dir, must exist
- */
- public static void createCompactorMarker(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+ static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException {
/**
- * create _meta_data json file in baseOrDeltaDir
- * write thisFileVersion, dataFormat
- *
- * on read if the file is not there, assume version 0 and dataFormat=acid
+ * this file was written by Hive versions before 4.0 into a base_x/ dir
+ * created by compactor so that it can be distinguished from the one
+ * created by Insert Overwrite
*/
Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
- Map<String, String> metaData = new HashMap<>();
- metaData.put(Field.VERSION, CURRENT_VERSION);
- metaData.put(Field.DATA_FORMAT, Value.COMPACTED);
- try (FSDataOutputStream strm = fs.create(formatFile, false)) {
- new ObjectMapper().writeValue(strm, metaData);
- } catch (IOException ioe) {
- String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
- + ": " + ioe.getMessage();
- LOG.error(msg, ioe);
- throw ioe;
- }
- }
- static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException {
- Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
if(!fs.exists(formatFile)) {
return false;
}
@@ -1995,7 +1991,7 @@ public class AcidUtils {
}
/**
- * Chooses 1 representantive file from {@code baseOrDeltaDir}
+ * Chooses 1 representative file from {@code baseOrDeltaDir}
* This assumes that all files in the dir are of the same type: either written by an acid
* write or Load Data. This should always be the case for an Acid table.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 302269b..c028e12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -616,7 +616,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
if (doWait) {
SessionState.start(ss); // This is the first call, open the session
}
- DriverUtils.runOnDriver(conf, user, ss, cmd, null);
+ DriverUtils.runOnDriver(conf, user, ss, cmd);
} catch (Exception e) {
LOG.error("Analyze command failed: " + cmd, e);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d5ee4a..dc05e19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -346,12 +346,13 @@ public class CompactorMR {
Path baseLocation = new Path(tmpLocation, "_base");
// Set up the session for driver.
- conf = new HiveConf(conf);
- conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- conf.unset(ValidTxnList.VALID_TXNS_KEY);//so Driver doesn't get confused
+ HiveConf driverConf = new HiveConf(conf);
+ driverConf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+ driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
+ //thinking it already has a txn opened
String user = UserGroupInformation.getCurrentUser().getShortUserName();
- SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false);
+ SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, false);
// Note: we could skip creating the table and just add table type stuff directly to the
// "insert overwrite directory" command if there were no bucketing or list bucketing.
@@ -363,7 +364,7 @@ public class CompactorMR {
p == null ? t.getSd() : p.getSd(), baseLocation.toString());
LOG.info("Compacting a MM table into " + query);
try {
- DriverUtils.runOnDriver(conf, user, sessionState, query, null);
+ DriverUtils.runOnDriver(driverConf, user, sessionState, query);
break;
} catch (Exception ex) {
Throwable cause = ex;
@@ -376,12 +377,13 @@ public class CompactorMR {
}
}
- String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
+ String query = buildMmCompactionQuery(driverConf, t, p, tmpTableName);
LOG.info("Compacting a MM table via " + query);
- DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds);
- commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
- DriverUtils.runOnDriver(conf, user, sessionState,
- "drop table if exists " + tmpTableName, null);
+ long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
+ DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
+ commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds, compactorTxnId);
+ DriverUtils.runOnDriver(driverConf, user, sessionState,
+ "drop table if exists " + tmpTableName);
} catch (HiveException e) {
LOG.error("Error compacting a MM table", e);
throw new IOException(e);
@@ -1002,7 +1004,7 @@ public class CompactorMR {
deleteEventWriter.close(false);
}
}
- private long getCompactorTxnId() {
+ private static long getCompactorTxnId(Configuration jobConf) {
String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY);
if(Strings.isNullOrEmpty(snapshot)) {
throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to "
@@ -1010,7 +1012,7 @@ public class CompactorMR {
}
ValidTxnList validTxnList = new ValidReadTxnList();
validTxnList.readFromString(snapshot);
- //this is id of the current txn
+ //this is id of the current (compactor) txn
return validTxnList.getHighWatermark();
}
private void getWriter(Reporter reporter, ObjectInspector inspector,
@@ -1026,7 +1028,7 @@ public class CompactorMR {
.maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
.bucket(bucket)
.statementId(-1)//setting statementId == -1 makes compacted delta files use
- .visibilityTxnId(getCompactorTxnId());
+ .visibilityTxnId(getCompactorTxnId(jobConf));
//delta_xxxx_yyyy format
// Instantiate the underlying output format
@@ -1050,7 +1052,7 @@ public class CompactorMR {
.maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
.statementId(-1)//setting statementId == -1 makes compacted delta files use
// delta_xxxx_yyyy format
- .visibilityTxnId(getCompactorTxnId());
+ .visibilityTxnId(getCompactorTxnId(jobConf));
// Instantiate the underlying output format
@SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
@@ -1171,19 +1173,17 @@ public class CompactorMR {
.minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
.maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
.bucket(0)
- .statementId(-1);
+ .statementId(-1)
+ .visibilityTxnId(CompactorMap.getCompactorTxnId(conf));
Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();
LOG.info(context.getJobID() + ": " + tmpLocation +
" not found. Assuming 0 splits. Creating " + newDeltaDir);
fs.mkdirs(newDeltaDir);
- createCompactorMarker(conf, newDeltaDir, fs);
AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs);
return;
}
- FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
- //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
- //name is that we want to rename; leave it for another day
- //todo: may actually have delta_x_y and delete_delta_x_y
+ FileStatus[] contents = fs.listStatus(tmpLocation);
+ //minor compaction may actually have delta_x_y and delete_delta_x_y
for (FileStatus fileStatus : contents) {
//newPath is the base/delta dir
Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -1193,16 +1193,9 @@ public class CompactorMR {
* meta files which will create base_x/ (i.e. B)...*/
fs.rename(fileStatus.getPath(), newPath);
AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs);
- createCompactorMarker(conf, newPath, fs);
}
fs.delete(tmpLocation, true);
}
- private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs)
- throws IOException {
- if(conf.getBoolean(IS_MAJOR, false)) {
- AcidUtils.MetaDataFile.createCompactorMarker(finalLocation, fs);
- }
- }
@Override
public void abortJob(JobContext context, int status) throws IOException {
@@ -1218,22 +1211,23 @@ public class CompactorMR {
* Note: similar logic to the main committer; however, no ORC versions and stuff like that.
* @param from The temp directory used for compactor output. Not the actual base/delta.
* @param to The final directory; basically a SD directory. Not the actual base/delta.
+ * @param compactorTxnId txn that the compactor started
*/
private void commitMmCompaction(String from, String to, Configuration conf,
- ValidWriteIdList actualWriteIds) throws IOException {
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
Path fromPath = new Path(from), toPath = new Path(to);
FileSystem fs = fromPath.getFileSystem(conf);
- //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
- //getAcidState() doesn't check if X is valid in base_X_cY for compacted base dirs.
// Assume the high watermark can be used as maximum transaction ID.
+ //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
+ //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
long maxTxn = actualWriteIds.getHighWatermark();
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
- .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1);
+ .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1)
+ .visibilityTxnId(compactorTxnId);
Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
if (!fs.exists(fromPath)) {
LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
fs.mkdirs(newBaseDir);
- AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs);
return;
}
LOG.info("Moving contents of " + from + " to " + to);
@@ -1243,7 +1237,6 @@ public class CompactorMR {
}
FileStatus dirPath = children[0];
fs.rename(dirPath.getPath(), newBaseDir);
- AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs);
fs.delete(fromPath, true);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e3fab69..42ccfdc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
@@ -156,6 +157,12 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
if (ci.runAs == null) {
ci.runAs = findUserToRunAs(sd.getLocation(), t);
}
+ /**
+ * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
+ * then the Driver would already have the an open txn but then this txn would have
+ * multiple statements in it (for query based compactor) which is not supported (and since
+ * this case some of the statements are DDL, even in the future will not be allowed in a
+ * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index eb0bfd9..05b5f3e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -142,18 +142,14 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
runStatementOnDriver("insert into T values(5,6),(8,8)");
String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b";
String[][] expected = new String[][] {
- {"1\t2",
- "t/delta_0000001_0000001_0000/000000_0"},
- {"4\t5",
- "t/delta_0000001_0000001_0000/000000_0"},
- {"5\t6",
- "t/delta_0000002_0000002_0000/000000_0"},
- {"8\t8",
- "t/delta_0000002_0000002_0000/000000_0"}};
+ {"1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+ {"4\t5", "t/delta_0000001_0000001_0000/000000_0"},
+ {"5\t6", "t/delta_0000002_0000002_0000/000000_0"},
+ {"8\t8", "t/delta_0000002_0000002_0000/000000_0"}};
checkResult(expected, testQuery, false, "check data", LOG);
- /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
- but in normal usage 'concatenate' is blocking, */
+ /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
+ but in normal usage 'concatenate' is blocking, */
hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);
runStatementOnDriver("alter table T concatenate");
@@ -166,14 +162,10 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
String[][] expected2 = new String[][] {
- {"1\t2",
- "t/base_0000002/000000_0"},
- {"4\t5",
- "t/base_0000002/000000_0"},
- {"5\t6",
- "t/base_0000002/000000_0"},
- {"8\t8",
- "t/base_0000002/000000_0"}};
+ {"1\t2", "t/base_0000002_v0000020/000000_0"},
+ {"4\t5", "t/base_0000002_v0000020/000000_0"},
+ {"5\t6", "t/base_0000002_v0000020/000000_0"},
+ {"8\t8", "t/base_0000002_v0000020/000000_0"}};
checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index 55131f3..2512579 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -610,7 +610,7 @@ public class TestStatsUpdaterThread {
}
private void executeQuery(String query) throws HiveException {
- DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query, null);
+ DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query);
}
private StatsUpdaterThread createUpdater() throws MetaException {