You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:07 UTC
[3/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional
tables (Eugene Koifman, reviewed by Alan Gates)
HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/508d7e6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/508d7e6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/508d7e6f
Branch: refs/heads/master
Commit: 508d7e6f269398a47147a697aecdbe546425679b
Parents: c03001e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 30 18:39:42 2017 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 30 18:39:42 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hadoop/hive/ql/history/TestHiveHistory.java | 2 +-
.../hive/ql/txn/compactor/TestCompactor.java | 33 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 8 +-
.../repl/bootstrap/load/table/LoadTable.java | 1 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 267 ++++++++---
.../hadoop/hive/ql/io/HiveInputFormat.java | 3 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 128 +++--
.../hive/ql/io/orc/OrcRawRecordMerger.java | 330 +++++++++++--
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 4 +-
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 3 +
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 75 +--
.../ql/io/orc/VectorizedOrcInputFormat.java | 2 -
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 32 ++
.../apache/hadoop/hive/ql/metadata/Hive.java | 78 +++-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 6 +-
.../hive/ql/parse/LoadSemanticAnalyzer.java | 22 +-
.../hadoop/hive/ql/plan/LoadTableDesc.java | 25 +-
.../hive/ql/txn/compactor/CompactorMR.java | 18 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 13 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
.../apache/hadoop/hive/ql/TestTxnLoadData.java | 467 +++++++++++++++++++
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 45 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 31 +-
.../hadoop/hive/ql/exec/TestExecDriver.java | 2 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 51 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 14 +-
.../TestVectorizedOrcAcidRowBatchReader.java | 4 +-
.../clientnegative/load_data_into_acid.q | 4 +-
.../clientnegative/load_data_into_acid.q.out | 6 +-
.../clientpositive/acid_table_stats.q.out | 1 +
.../clientpositive/autoColumnStats_4.q.out | 3 +
.../results/clientpositive/mm_default.q.out | 1 +
.../TransactionalValidationListener.java | 20 +-
36 files changed, 1392 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ada2318..3be5a8d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1912,7 +1912,7 @@ public class HiveConf extends Configuration {
"1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
"4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
"This is intended to be used as an internal property for future versions of ACID. (See\n" +
- "HIVE-14035 for details.)"),
+ "HIVE-14035 for details. User sets it tblproperites via transactional_properties.)", true),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 2f0efce..d73cd64 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -106,7 +106,7 @@ public class TestHiveHistory extends TestCase {
db.createTable(src, cols, null, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src,
- LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+ LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0);
i++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7103fb9..a1cd9eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -114,8 +115,6 @@ public class TestCompactor {
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
- //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.cleanDb(hiveConf);
@@ -669,7 +668,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
@@ -697,7 +696,8 @@ public class TestCompactor {
writeBatch(connection, writer, false);
}
- // Start a third batch, but don't close it.
+ // Start a third batch, but don't close it. this delta will be ignored by compaction since
+ // it has an open txn in it
writeBatch(connection, writer, true);
// Now, compact
@@ -722,7 +722,7 @@ public class TestCompactor {
}
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -788,7 +788,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -850,7 +850,7 @@ public class TestCompactor {
if (!name.equals("base_0000006")) {
Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
}
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -903,7 +903,7 @@ public class TestCompactor {
}
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2);
} finally {
connection.close();
}
@@ -966,7 +966,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -984,7 +984,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1);
}
@Test
@@ -1043,7 +1043,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -1062,7 +1062,7 @@ public class TestCompactor {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
// There should be no rows in the delete_delta because there have been no delete events.
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
}
@Test
@@ -1121,7 +1121,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
// Verify that we have got correct set of delete_deltas also
FileStatus[] deleteDeltaStat =
@@ -1140,7 +1140,7 @@ public class TestCompactor {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
// There should be no rows in the delete_delta because there have been no delete events.
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
} finally {
connection.close();
@@ -1295,7 +1295,7 @@ public class TestCompactor {
}
private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
- String columnTypesProperty, int bucket, long min, long max)
+ String columnTypesProperty, int bucket, long min, long max, int numBuckets)
throws IOException {
ValidTxnList txnList = new ValidTxnList() {
@Override
@@ -1351,9 +1351,10 @@ public class TestCompactor {
Configuration conf = new Configuration();
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty);
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty);
+ conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets));
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
AcidInputFormat.RawReader<OrcStruct> reader =
- aif.getRawReader(conf, false, bucket, txnList, base, deltas);
+ aif.getRawReader(conf, true, bucket, txnList, base, deltas);
RecordIdentifier identifier = reader.createKey();
OrcStruct value = reader.createValue();
long currentTxn = min;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 186d580..2f7284f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -376,7 +376,6 @@ public enum ErrorMsg {
DBTXNMGR_REQUIRES_CONCURRENCY(10264,
"To use DbTxnManager you must set hive.support.concurrency=true"),
TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),
- LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true),
LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " +
"may have timed out", true),
LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " +
@@ -550,6 +549,8 @@ public enum ErrorMsg {
ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables"),
ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT(30022, "Must use HiveInputFormat to read ACID tables " +
"(set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat)"),
+ ACID_LOAD_DATA_INVALID_FILE_NAME(30023, "{0} file name is not valid in Load Data into Acid " +
+ "table {1}. Examples of valid names are: 00000_0, 00000_0_copy_1", true),
CONCATENATE_UNSUPPORTED_FILE_FORMAT(30030, "Concatenate/Merge only supported for RCFile and ORCFile formats"),
CONCATENATE_UNSUPPORTED_TABLE_BUCKETED(30031, "Concatenate/Merge can not be performed on bucketed tables"),
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4076a9f..9184844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4488,7 +4488,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
part.getTPartition().getParameters().putAll(alterTbl.getProps());
} else {
boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters());
- Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps());
+ Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
if (isToMmTable != null) {
if (!isFromMmTable && isToMmTable) {
result = generateAddMmTasks(tbl);
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e2f8c1f..6d13773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -215,7 +215,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
Context ctx = driverContext.getCtx();
if(ctx.getHiveTxnManager().supportsAcid()) {
- //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+ //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit
return;
}
HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
@@ -290,7 +290,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
} else {
Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath);
if(lfd.getWriteType() == AcidUtils.Operation.INSERT) {
- //'targetPath' is table root of un-partitioned table/partition
+ //'targetPath' is table root of un-partitioned table or partition
//'sourcePath' result of 'select ...' part of CTAS statement
assert lfd.getIsDfsDir();
FileSystem srcFs = sourcePath.getFileSystem(conf);
@@ -367,7 +367,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
checkFileFormats(db, tbd, table);
boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
- && !tbd.isMmTable();
+ && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
// Create a data container
DataContainer dc = null;
@@ -379,7 +379,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
- tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable());
+ tbd.getTxnId(), tbd.getStmtId());
if (work.getOutputs() != null) {
DDLTask.addIfAbsentByName(new WriteEntity(table,
getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index bb1f4e5..545b7a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -229,6 +229,7 @@ public class LoadTable {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+ //todo: what is the point of this? If this is for replication, who would have opened a txn?
SessionState.get().getTxnMgr().getCurrentTxnId()
);
MoveWork moveWork =
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4c0b71f..9ab028d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -30,6 +30,8 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.Ref;
import org.apache.orc.impl.OrcAcidUtils;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,13 +123,14 @@ public class AcidUtils {
public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
/**
- * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
- * a "delta" dir written by a real Acid write - cannot have any copies
+ * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
+ * (Unless via Load Data statment)
*/
public static final PathFilter originalBucketFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
- return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+ return ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
+ ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
}
};
@@ -137,6 +139,7 @@ public class AcidUtils {
}
private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
+ public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
public static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
/**
@@ -156,14 +159,30 @@ public class AcidUtils {
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
/**
- * Create the bucket filename.
+ * Create the bucket filename in Acid format
* @param subdir the subdirectory for the bucket.
* @param bucket the bucket number
* @return the filename
*/
public static Path createBucketFile(Path subdir, int bucket) {
- return new Path(subdir,
+ return createBucketFile(subdir, bucket, true);
+ }
+
+ /**
+ * Create acid or original bucket name
+ * @param subdir the subdirectory for the bucket.
+ * @param bucket the bucket number
+ * @return the filename
+ */
+ private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+ if(isAcidSchema) {
+ return new Path(subdir,
BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+ }
+ else {
+ return new Path(subdir,
+ String.format(BUCKET_DIGITS, bucket));
+ }
}
/**
@@ -244,7 +263,7 @@ public class AcidUtils {
* @param path the base directory name
* @return the maximum transaction id that is included
*/
- static long parseBase(Path path) {
+ public static long parseBase(Path path) {
String filename = path.getName();
if (filename.startsWith(BASE_PREFIX)) {
return Long.parseLong(filename.substring(BASE_PREFIX.length()));
@@ -262,7 +281,7 @@ public class AcidUtils {
*/
public static AcidOutputFormat.Options
parseBaseOrDeltaBucketFilename(Path bucketFile,
- Configuration conf) {
+ Configuration conf) throws IOException {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
if (ORIGINAL_PATTERN.matcher(filename).matches()) {
@@ -273,7 +292,7 @@ public class AcidUtils {
.minimumTransactionId(0)
.maximumTransactionId(0)
.bucket(bucket)
- .writingBase(true);
+ .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
//todo: define groups in regex and use parseInt(Matcher.group(2))....
@@ -286,7 +305,7 @@ public class AcidUtils {
.maximumTransactionId(0)
.bucket(bucket)
.copyNumber(copyNumber)
- .writingBase(true);
+ .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
else if (filename.startsWith(BUCKET_PREFIX)) {
int bucket =
@@ -299,14 +318,16 @@ public class AcidUtils {
.bucket(bucket)
.writingBase(true);
} else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
- ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
+ bucketFile.getFileSystem(conf));
result
.setOldStyle(false)
.minimumTransactionId(parsedDelta.minTransaction)
.maximumTransactionId(parsedDelta.maxTransaction)
.bucket(bucket);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
- ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
+ bucketFile.getFileSystem(conf));
result
.setOldStyle(false)
.minimumTransactionId(parsedDelta.minTransaction)
@@ -344,11 +365,17 @@ public class AcidUtils {
throw new IllegalArgumentException("Unexpected Operation: " + op);
}
}
-
public enum AcidBaseFileType {
- COMPACTED_BASE, // a regular base file generated through major compaction
- ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
- INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+ /**
+ * File w/o Acid meta columns. This this would be the case for files that were added to the table
+ * before it was converted to Acid but not yet major compacted. May also be the the result of
+ * Load Data statement on an acid table.
+ */
+ ORIGINAL_BASE,
+ /**
+ * File that has Acid metadata columns embedded in it. Found in base_x/ or delta_x_y/.
+ */
+ ACID_SCHEMA,
}
/**
@@ -366,16 +393,12 @@ public class AcidUtils {
this.acidBaseFileType = acidBaseFileType;
}
- public boolean isCompactedBase() {
- return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
- }
-
public boolean isOriginal() {
return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
}
- public boolean isInsertDelta() {
- return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+ public boolean isAcidSchema() {
+ return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA;
}
public HdfsFileStatusWithId getHdfsFileStatusWithId() {
@@ -545,6 +568,7 @@ public class AcidUtils {
* @return the base directory to read
*/
Path getBaseDirectory();
+ boolean isBaseInRawFormat();
/**
* Get the list of original files. Not {@code null}. Must be sorted.
@@ -576,7 +600,10 @@ public class AcidUtils {
List<FileStatus> getAbortedDirectories();
}
- public static class ParsedDelta implements Comparable<ParsedDelta> {
+ /**
+ * Immutable
+ */
+ public static final class ParsedDelta implements Comparable<ParsedDelta> {
private final long minTransaction;
private final long maxTransaction;
private final FileStatus path;
@@ -584,19 +611,24 @@ public class AcidUtils {
//had no statement ID
private final int statementId;
private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
+ private final boolean isRawFormat;
/**
* for pre 1.3.x delta files
*/
- ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
- this(min, max, path, -1, isDeleteDelta);
+ private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
+ boolean isRawFormat) {
+ this(min, max, path, -1, isDeleteDelta, isRawFormat);
}
- ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
+ private ParsedDelta(long min, long max, FileStatus path, int statementId,
+ boolean isDeleteDelta, boolean isRawFormat) {
this.minTransaction = min;
this.maxTransaction = max;
this.path = path;
this.statementId = statementId;
this.isDeleteDelta = isDeleteDelta;
+ this.isRawFormat = isRawFormat;
+ assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
}
public long getMinTransaction() {
@@ -618,7 +650,12 @@ public class AcidUtils {
public boolean isDeleteDelta() {
return isDeleteDelta;
}
-
+ /**
+ * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
+ */
+ public boolean isRawFormat() {
+ return isRawFormat;
+ }
/**
* Compactions (Major/Minor) merge deltas/bases but delete of old files
* happens in a different process; thus it's possible to have bases/deltas with
@@ -698,29 +735,6 @@ public class AcidUtils {
}
/**
- * Convert the list of begin/end transaction id pairs to a list of delta
- * directories. Note that there may be multiple delta files for the exact same txn range starting
- * with 1.3.x;
- * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
- * @param root the root directory
- * @param deltas list of begin/end transaction id pairs
- * @return the list of delta paths
- */
- public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
- List<Path> results = new ArrayList<Path>(deltas.size());
- for(AcidInputFormat.DeltaMetaData dmd : deltas) {
- if(dmd.getStmtIds().isEmpty()) {
- results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
- continue;
- }
- for(Integer stmtId : dmd.getStmtIds()) {
- results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
- }
- }
- return results.toArray(new Path[results.size()]);
- }
-
- /**
* Convert the list of begin/end transaction id pairs to a list of delete delta
* directories. Note that there may be multiple delete_delta files for the exact same txn range starting
* with 2.2.x;
@@ -743,25 +757,29 @@ public class AcidUtils {
return results.toArray(new Path[results.size()]);
}
- public static ParsedDelta parsedDelta(Path deltaDir) {
+ public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
String deltaDirName = deltaDir.getName();
if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
- return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+ return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
}
- return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+ return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
}
- private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
- ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+ private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs)
+ throws IOException {
+ ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
return new ParsedDelta(p.getMinTransaction(),
- p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+ p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat());
}
- public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
+ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+ throws IOException {
String filename = deltaDir.getName();
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
if (filename.startsWith(deltaPrefix)) {
+ //small optimization - delete delta can't be in raw format
+ boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
String rest = filename.substring(deltaPrefix.length());
int split = rest.indexOf('_');
int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
@@ -770,10 +788,10 @@ public class AcidUtils {
Long.parseLong(rest.substring(split + 1)) :
Long.parseLong(rest.substring(split + 1, split2));
if(split2 == -1) {
- return new ParsedDelta(min, max, null, isDeleteDelta);
+ return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
}
int statementId = Integer.parseInt(rest.substring(split2 + 1));
- return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
+ return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
}
throw new IllegalArgumentException(deltaDir + " does not start with " +
deltaPrefix);
@@ -871,13 +889,13 @@ public class AcidUtils {
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
- obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+ obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
}
} else {
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
for (FileStatus child : children) {
getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
- bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+ bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
}
}
@@ -976,12 +994,18 @@ public class AcidUtils {
//this does "Path.uri.compareTo(that.uri)"
return o1.getFileStatus().compareTo(o2.getFileStatus());
});
- return new Directory(){
+
+ final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
+ return new Directory() {
@Override
public Path getBaseDirectory() {
return base;
}
+ @Override
+ public boolean isBaseInRawFormat() {
+ return isBaseInRawFormat;
+ }
@Override
public List<HdfsFileStatusWithId> getOriginalFiles() {
@@ -1022,7 +1046,8 @@ public class AcidUtils {
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
- boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
+ boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
+ FileSystem fs) throws IOException {
Path p = child.getPath();
String fn = p.getName();
if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -1050,7 +1075,7 @@ public class AcidUtils {
&& child.isDir()) {
String deltaPrefix =
(fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
- ParsedDelta delta = parseDelta(child, deltaPrefix);
+ ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) &&
ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
aborted.add(child);
@@ -1171,8 +1196,11 @@ public class AcidUtils {
parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable));
}
- public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
- HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
+ /**
+ * Means it's a full acid table
+ */
+ public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) {
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable);
}
/**
* @param p - not null
@@ -1185,6 +1213,8 @@ public class AcidUtils {
* SessionState.get().getTxnMgr().supportsAcid() here
* @param table table
* @return true if table is a legit ACID table, false otherwise
+ * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers
+ * both Acid and MM tables. HIVE-18124
*/
public static boolean isAcidTable(Table table) {
if (table == null) {
@@ -1197,6 +1227,10 @@ public class AcidUtils {
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
+ /**
+ * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers
+ * both Acid and MM tables. HIVE-18124
+ */
public static boolean isAcidTable(CreateTableDesc table) {
if (table == null || table.getTblProps() == null) {
return false;
@@ -1208,8 +1242,11 @@ public class AcidUtils {
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
+ /**
+ * after isTransactionalTable() then make this isAcid() HIVE-18124
+ */
public static boolean isFullAcidTable(Table table) {
- return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters());
+ return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table);
}
/**
@@ -1336,6 +1373,9 @@ public class AcidUtils {
public static boolean isInsertOnlyTable(Map<String, String> params) {
return isInsertOnlyTable(params, false);
}
+ public static boolean isInsertOnlyTable(Table table) {
+ return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly();
+ }
// TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used?
public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
@@ -1349,13 +1389,21 @@ public class AcidUtils {
return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
}
- /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
- public static Boolean isToInsertOnlyTable(Map<String, String> props) {
+ /**
+ * The method for altering table props; may set the table to MM, non-MM, or not affect MM.
+ * todo: All such validation logic should be TransactionValidationListener
+ * @param tbl object image before alter table command
+ * @param props prop values set in this alter table command
+ */
+ public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
// Note: Setting these separately is a very hairy issue in certain combinations, since we
// cannot decide what type of table this becomes without taking both into account, and
// in many cases the conversion might be illegal.
// The only thing we allow is tx = true w/o tx-props, for backward compat.
String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if(transactional == null) {
+ transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ }
String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactional == null && transactionalProp == null) return null; // Not affected.
boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
@@ -1378,4 +1426,81 @@ public class AcidUtils {
hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return hasTxn || hasProps;
}
+
+ /**
+ * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
+ * into delta_x_x/ (or base_x in case there is Overwrite clause). {@link MetaDataFile} is a
+ * small JSON file in this directory that indicates that these files don't have Acid metadata
+ * columns and so the values for these columns need to be assigned at read time/compaction.
+ */
+ public static class MetaDataFile {
+ //export command uses _metadata....
+ private static final String METADATA_FILE = "_metadata_acid";
+ private static final String CURRENT_VERSION = "0";
+ //todo: enums? that have both field name and value list
+ private interface Field {
+ String VERSION = "thisFileVersion";
+ String DATA_FORMAT = "dataFormat";
+ }
+ private interface Value {
+ //plain ORC file
+ String RAW = "raw";
+ //result of acid write, i.e. decorated with ROW__ID info
+ String NATIVE = "native";
+ }
+
+ /**
+ * @param baseOrDeltaDir detla or base dir, must exist
+ */
+ public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat)
+ throws IOException {
+ /**
+ * create _meta_data json file in baseOrDeltaDir
+ * write thisFileVersion, dataFormat
+ *
+ * on read if the file is not there, assume version 0 and dataFormat=acid
+ */
+ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+ Map<String, String> metaData = new HashMap<>();
+ metaData.put(Field.VERSION, CURRENT_VERSION);
+ metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE);
+ try (FSDataOutputStream strm = fs.create(formatFile, false)) {
+ new ObjectMapper().writeValue(strm, metaData);
+ } catch (IOException ioe) {
+ String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
+ + ": " + ioe.getMessage();
+ LOG.error(msg, ioe);
+ throw ioe;
+ }
+ }
+ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+ if(!fs.exists(formatFile)) {
+ return false;
+ }
+ try (FSDataInputStream strm = fs.open(formatFile)) {
+ Map<String, String> metaData = new ObjectMapper().readValue(strm, Map.class);
+ if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) {
+ throw new IllegalStateException("Unexpected Meta Data version: "
+ + metaData.get(Field.VERSION));
+ }
+ String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null");
+ switch (dataFormat) {
+ case Value.NATIVE:
+ return false;
+ case Value.RAW:
+ return true;
+ default:
+ throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT
+ + ": " + dataFormat);
+ }
+ }
+ catch(IOException e) {
+ String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE
+ + ": " + e.getMessage();
+ LOG.error(msg, e);
+ throw e;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 6a1dc72..819c2a2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -468,6 +468,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
try {
Utilities.copyTablePropertiesToConf(table, conf);
+ if(tableScan != null) {
+ AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable());
+ }
} catch (HiveException e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index dda9f93..becdc71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import java.io.IOException;
@@ -409,7 +410,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* @param readerSchema the types for the reader
* @param conf the configuration
*/
- public static boolean[] genIncludedColumns(TypeDescription readerSchema,
+ static boolean[] genIncludedColumns(TypeDescription readerSchema,
Configuration conf) {
if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
@@ -419,7 +420,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- public static String[] getSargColumnNames(String[] originalColumnNames,
+ private static String[] getSargColumnNames(String[] originalColumnNames,
List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
int rootColumn = getRootColumn(isOriginal);
String[] columnNames = new String[types.size() - rootColumn];
@@ -695,21 +696,29 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
*/
@VisibleForTesting
static final class AcidDirInfo {
- public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+ AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
List<AcidBaseFileInfo> baseFiles,
- List<ParsedDelta> parsedDeltas) {
+ List<ParsedDelta> deleteEvents) {
this.splitPath = splitPath;
this.acidInfo = acidInfo;
this.baseFiles = baseFiles;
this.fs = fs;
- this.parsedDeltas = parsedDeltas;
+ this.deleteEvents = deleteEvents;
}
final FileSystem fs;
final Path splitPath;
final AcidUtils.Directory acidInfo;
final List<AcidBaseFileInfo> baseFiles;
- final List<ParsedDelta> parsedDeltas;
+ final List<ParsedDelta> deleteEvents;
+
+ /**
+ * No (qualifying) data files found in {@link #splitPath}
+ * @return
+ */
+ boolean isEmpty() {
+ return (baseFiles == null || baseFiles.isEmpty());
+ }
}
@VisibleForTesting
@@ -884,7 +893,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public CombineResult combineWith(FileSystem fs, Path dir,
List<HdfsFileStatusWithId> otherFiles, boolean isOriginal) {
if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT
- || this.isOriginal != isOriginal) {
+ || this.isOriginal != isOriginal) {//todo: what is this checking????
return (files.size() > otherFiles.size())
? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE;
}
@@ -1083,6 +1092,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class FileGenerator implements Callable<AcidDirInfo> {
private final Context context;
private final FileSystem fs;
+ /**
+ * For plain or acid tables this is the root of the partition (or table if not partitioned).
+ * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already
+ * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}.
+ */
private final Path dir;
private final Ref<Boolean> useFileIds;
private final UserGroupInformation ugi;
@@ -1119,25 +1134,27 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private AcidDirInfo callInternal() throws IOException {
+ //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
context.transactionList, useFileIds, true, null);
- Path base = dirInfo.getBaseDirectory();
// find the base files (original or new style)
- List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
- if (base == null) {
+ List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
+ if (dirInfo.getBaseDirectory() == null) {
+ //for non-acid tables, all data files are in getOriginalFiles() list
for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
}
} else {
- List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+ List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+ baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
+ AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
}
}
// Find the parsed deltas- some of them containing only the insert delta events
// may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
- List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+ List<ParsedDelta> parsedDeltas = new ArrayList<>();
if (context.acidOperationalProperties != null &&
context.acidOperationalProperties.isSplitUpdate()) {
@@ -1154,15 +1171,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (parsedDelta.isDeleteDelta()) {
parsedDeltas.add(parsedDelta);
} else {
+ AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ?
+ AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
+ PathFilter bucketFilter = parsedDelta.isRawFormat() ?
+ AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
+ if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() !=
+ parsedDelta.getMaxTransaction()) {
+ //delta/ with files in raw format are a result of Load Data (as opposed to compaction
+ //or streaming ingest so must have interval length == 1.
+ throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
+ + " format but txnIds are out of range: " + parsedDelta.getPath());
+ }
// This is a normal insert delta, which only has insert events and hence all the files
// in this delta directory can be considered as a base.
Boolean val = useFileIds.value;
if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
- SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+ SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
}
if (val == null) {
useFileIds.value = true; // The call succeeded, so presumably the API is there.
@@ -1176,15 +1204,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
// Fall back to regular API and create statuses without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
for (FileStatus child : children) {
HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
}
}
}
} else {
+ /*
+ We already handled all delete deltas above and there should not be any other deltas for
+ any table type. (this was acid 1.0 code path).
+ */
+ assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir;
// When split-update is not enabled, then all the deltas in the current directories
// should be considered as usual.
parsedDeltas.addAll(dirInfo.getCurrentDirectories());
@@ -1658,7 +1691,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
pathFutures.add(ecs.submit(fileGenerator));
}
- boolean isTransactionalTableScan =//this never seems to be set correctly
+ boolean isTransactionalTableScan =
HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
TypeDescription readerSchema =
@@ -1700,13 +1733,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// We have received a new directory information, make split strategies.
--resultsLeft;
-
+ if(adi.isEmpty()) {
+ //no files found, for example empty table/partition
+ continue;
+ }
// The reason why we can get a list of split strategies here is because for ACID split-update
// case when we have a mix of original base files & insert deltas, we will produce two
// independent split strategies for them. There is a global flag 'isOriginal' that is set
// on a per split strategy basis and it has to be same for all the files in that strategy.
List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
- adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+ adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi,
allowSyntheticFileIds);
for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1790,6 +1826,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds,
boolean isDefaultFs) {
if (!deltas.isEmpty() || combinedCtx == null) {
+ //why is this checking for deltas.isEmpty() - HIVE-18110
return new ETLSplitStrategy(
context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi,
allowSyntheticFileIds, isDefaultFs);
@@ -1955,6 +1992,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
mergerOptions.rootPath(split.getRootDir());
+ mergerOptions.bucketPath(split.getPath());
final int bucket;
if (split.hasBase()) {
AcidOutputFormat.Options acidIOOptions =
@@ -1968,8 +2006,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
} else {
bucket = (int) split.getStart();
+ assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath();
}
-
+ //todo: createOptionsForReader() assumes it's !isOriginal.... why?
final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
readOptions.range(split.getStart(), split.getLength());
@@ -2041,6 +2080,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf));
+ //todo: last param is bogus. why is this hardcoded?
OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true);
return readerOptions;
}
@@ -2144,7 +2184,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<AcidBaseFileInfo> baseFiles,
List<ParsedDelta> parsedDeltas,
List<OrcProto.Type> readerTypes,
- UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+ UserGroupInformation ugi, boolean allowSyntheticFileIds) throws IOException {
List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
SplitStrategy<?> splitStrategy;
@@ -2153,23 +2193,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem)
&& HdfsUtils.isDefaultFs((DistributedFileSystem) fs));
- // When no baseFiles, we will just generate a single split strategy and return.
- List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
if (baseFiles.isEmpty()) {
- splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles,
+ assert false : "acid 2.0 no base?!: " + dir;
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(),
false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs);
if (splitStrategy != null) {
splitStrategies.add(splitStrategy);
}
- return splitStrategies; // return here
+ return splitStrategies;
}
+ List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<>();
List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
// Separate the base files into acid schema and non-acid(original) schema files.
for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
if (acidBaseFileInfo.isOriginal()) {
originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
} else {
+ assert acidBaseFileInfo.isAcidSchema();
acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
}
}
@@ -2195,14 +2236,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return splitStrategies;
}
- @VisibleForTesting
- static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
+ private static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
FileSystem fs, Path dir,
List<HdfsFileStatusWithId> baseFiles,
boolean isOriginal,
List<ParsedDelta> parsedDeltas,
List<OrcProto.Type> readerTypes,
- UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) {
+ UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs)
+ throws IOException {
List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
boolean[] covered = new boolean[context.numBuckets];
@@ -2250,6 +2291,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ /**
+ *
+ * @param bucket bucket/writer ID for this split of the compaction job
+ */
@Override
public RawReader<OrcStruct> getRawReader(Configuration conf,
boolean collapseEvents,
@@ -2258,25 +2303,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Path baseDirectory,
Path[] deltaDirectory
) throws IOException {
- Reader reader = null;
boolean isOriginal = false;
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
+ .isMajorCompaction(collapseEvents);
if (baseDirectory != null) {//this is NULL for minor compaction
- Path bucketFile = null;
+ //it may also be null if there is no base - only deltas
+ mergerOptions.baseDir(baseDirectory);
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
- bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+ isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+ mergerOptions.rootPath(baseDirectory.getParent());
} else {
- /**we don't know which file to start reading -
- * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
isOriginal = true;
+ mergerOptions.rootPath(baseDirectory);
}
- if(bucketFile != null) {
- reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
- }
}
- OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
- .isCompacting(true)
- .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
- return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+ else {
+ //since we have no base, there must be at least 1 delta which must a result of acid write
+ //so it must be immediate child of the partition
+ mergerOptions.rootPath(deltaDirectory[0].getParent());
+ }
+ return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
}