You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/16 01:14:45 UTC
[3/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene
Koifman, reviewed by Sergey Shelukhin)
HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b0e07a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b0e07a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b0e07a
Branch: refs/heads/master
Commit: 34b0e07a3bd5002b197b749e3e5a7992e196c237
Parents: 5061683
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 15 18:13:44 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 15 18:13:44 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 9 +-
.../hive/hcatalog/streaming/TestStreaming.java | 147 ++++-
.../streaming/mutate/StreamingAssert.java | 45 +-
.../streaming/mutate/TestMutations.java | 17 +-
.../TransactionalValidationListener.java | 2 -
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 -
.../hadoop/hive/ql/io/AcidInputFormat.java | 6 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 90 ++-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 41 +-
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 2 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 41 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 60 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 20 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 186 ++++---
.../ql/TestTxnCommands2WithSplitUpdate.java | 545 -------------------
...ommands2WithSplitUpdateAndVectorization.java | 4 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 31 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 191 ++++---
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 509 ++++++++++++-----
.../hive/ql/io/orc/TestOrcRecordUpdater.java | 36 +-
.../TestVectorizedOrcAcidRowBatchReader.java | 4 -
.../hive/ql/txn/compactor/CompactorTest.java | 8 +
.../hive/ql/txn/compactor/TestWorker.java | 125 +++--
.../dynamic_semijoin_reduction_3.q | 2 +-
.../llap/dynamic_semijoin_reduction_3.q.out | 4 +-
.../test/results/clientpositive/row__id.q.out | 18 +-
26 files changed, 1086 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b154544..3c158a6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1841,12 +1841,9 @@ public class HiveConf extends Configuration {
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
- HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
- "Sets the operational properties that control the appropriate behavior for various\n"
- + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
- + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
- + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
- + "for future versions of ACID. (See HIVE-14035 for details.)"),
+ HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
+ "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+ "HIVE-14035 for details.)"),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 5e8fe62..f3ef92b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -27,6 +27,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -44,7 +45,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -97,6 +99,8 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
public class TestStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
@@ -448,7 +452,10 @@ public class TestStreaming {
}
}
-
+ /**
+ * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
+ * little value in using InputFormat directly
+ */
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -473,13 +480,14 @@ public class TestStreaming {
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
- job.set("bucket_count", Integer.toString(buckets));
+ job.set(BUCKET_COUNT, Integer.toString(buckets));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
- job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inf.getSplits(job, buckets);
- Assert.assertEquals(buckets, splits.length);
+ Assert.assertEquals(numExpectedFiles, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
@@ -491,6 +499,48 @@ public class TestStreaming {
}
Assert.assertEquals(false, rr.next(key, value));
}
+ /**
+ * @param validationQuery query to read from table to compare data against {@code records}
+ * @param records expected data. each row is CVS list of values
+ */
+ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+ String validationQuery, boolean vectorize, String... records) throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute minimum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+ if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+ boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+ if(vectorize) {
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+ for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+ //run it with each split strategy - make sure there are differences
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+ List<String> actualResult = queryTable(driver, validationQuery);
+ for (int i = 0; i < actualResult.size(); i++) {
+ Assert.assertEquals("diff at [" + i + "]. actual=" + actualResult + " expected=" +
+ Arrays.toString(records), records[i], actualResult.get(i));
+ }
+ }
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+ }
private void checkNothingWritten(Path partitionPath) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -1016,15 +1066,15 @@ public class TestStreaming {
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
-
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten2(partLoc, 15, 24, 1, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming");
txnBatch.close();
@@ -1034,16 +1084,16 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten2(partLoc, 15, 40, 2, validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten2(partLoc, 15, 40, 2, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -1053,7 +1103,6 @@ public class TestStreaming {
connection.close();
}
-
@Test
public void testInterleavedTransactionBatchCommits() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
@@ -1078,32 +1127,69 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 24, 33, 1,
+ validationQuery, true, "3\tHello streaming - once again");
txnBatch1.commit();
-
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ /*now both batches have committed (but not closed) so we for each primary file we expect a side
+ file to exist and indicate the true length of primary file*/
+ FileSystem fs = partLoc.getFileSystem(conf);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength == actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
-
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+ //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
+ //has now received more data(logically - it's buffered) but it is not yet committed.
+ //lets check that side files exist, etc
+ dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength <= actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}",
- "{3, Hello streaming - once again}");
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again");
txnBatch2.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}",
- "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch1.getCurrentTransactionState());
@@ -2035,11 +2121,12 @@ public class TestStreaming {
public static ArrayList<String> queryTable(Driver driver, String query)
throws CommandNeedRetryException, IOException {
- driver.run(query);
+ CommandProcessorResponse cpr = driver.run(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(query + " failed: " + cpr);
+ }
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
- if(res.isEmpty())
- System.err.println(driver.getErrorMsg());
return res;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index de41d34..d5429fb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -27,12 +27,12 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -123,35 +123,50 @@ public class StreamingAssert {
}
List<Record> readRecords() throws Exception {
+ return readRecords(1);
+ }
+
+ /**
+ * TODO: this would be more flexible doing a SQL select statement rather than using InputFormat directly
+ * see {@link org.apache.hive.hcatalog.streaming.TestStreaming#checkDataWritten2(Path, long, long, int, String, String...)}
+ * @param numSplitsExpected
+ * @return
+ * @throws Exception
+ */
+ List<Record> readRecords(int numSplitsExpected) throws Exception {
if (currentDeltas.isEmpty()) {
throw new AssertionError("No data");
}
InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionLocation.toString());
- job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+ job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets()));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
- job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inputFormat.getSplits(job, 1);
- assertEquals(1, splits.length);
-
- final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
- .getRecordReader(splits[0], job, Reporter.NULL);
+ assertEquals(numSplitsExpected, splits.length);
- NullWritable key = recordReader.createKey();
- OrcStruct value = recordReader.createValue();
List<Record> records = new ArrayList<>();
- while (recordReader.next(key, value)) {
- RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
- Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+ for(InputSplit is : splits) {
+ final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+ .getRecordReader(is, job, Reporter.NULL);
+
+ NullWritable key = recordReader.createKey();
+ OrcStruct value = recordReader.createValue();
+
+ while (recordReader.next(key, value)) {
+ RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+ Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
- System.out.println(record);
- records.add(record);
+ System.out.println(record);
+ records.add(record);
+ }
+ recordReader.close();
}
- recordReader.close();
return records;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
index ab9f313..5bfa04d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -529,22 +529,23 @@ public class TestMutations {
StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
indiaAssertions.assertMinTransactionId(1L);
indiaAssertions.assertMaxTransactionId(2L);
- List<Record> indiaRecords = indiaAssertions.readRecords();
+ List<Record> indiaRecords = indiaAssertions.readRecords(2);
assertThat(indiaRecords.size(), is(3));
assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
encodeBucket(0), 0L)));
assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
- assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
- encodeBucket(0), 1L)));
+ assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(2L,
+ encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L,
- encodeBucket(0), 0L)));
+ encodeBucket(0), 1L)));
StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
ukAssertions.assertMinTransactionId(1L);
ukAssertions.assertMaxTransactionId(2L);
- List<Record> ukRecords = ukAssertions.readRecords();
+ //1 split since mutateTransaction txn just does deletes
+ List<Record> ukRecords = ukAssertions.readRecords(1);
assertThat(ukRecords.size(), is(1));
assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
@@ -553,11 +554,11 @@ public class TestMutations {
StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
franceAssertions.assertMinTransactionId(1L);
franceAssertions.assertMaxTransactionId(2L);
- List<Record> franceRecords = franceAssertions.readRecords();
+ List<Record> franceRecords = franceAssertions.readRecords(2);
assertThat(franceRecords.size(), is(1));
assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
- assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
- encodeBucket(0), 1L)));
+ assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(2L,
+ encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
client.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 0f08f43..023d703 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -39,7 +39,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
// These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
- public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
TransactionalValidationListener(Configuration conf) {
super(conf);
@@ -276,7 +275,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
boolean isValid = false;
switch (transactionalProperties) {
case DEFAULT_TRANSACTIONAL_PROPERTY:
- case LEGACY_TRANSACTIONAL_PROPERTY:
isValid = true;
break;
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8999f6f..25ad1e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -148,8 +148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
RecordWriter[] outWriters;
RecordUpdater[] updaters;
Stat stat;
- int acidLastBucket = -1;
- int acidFileOffset = -1;
public FSPaths(Path specPath) {
tmpPath = Utilities.toTempPath(specPath);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 25177ef..9864c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,6 +112,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
private long minTxnId;
private long maxTxnId;
private List<Integer> stmtIds;
+ //would be useful to have enum for Type: insert/delete/load data
public DeltaMetaData() {
this(0,0,new ArrayList<Integer>());
@@ -155,6 +156,11 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
stmtIds.add(in.readInt());
}
}
+ @Override
+ public String toString() {
+ //? is Type - when implemented
+ return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")";
+ }
}
/**
* Options for controlling the record readers.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1e33424..feacdd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -39,11 +39,13 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.OrcAcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +69,18 @@ public class AcidUtils {
};
public static final String DELTA_PREFIX = "delta_";
public static final String DELETE_DELTA_PREFIX = "delete_delta_";
+ /**
+ * Acid Streaming Ingest writes multiple transactions to the same file. It also maintains a
+ * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of
+ * the primary file as of the last commit ({@link OrcRecordUpdater#flush()}). That is the 'logical length'.
+ * Once the primary is closed, the side file is deleted (logical length = actual length) but if
+ * the writer dies or the primary file is being read while its still being written to, anything
+ * past the logical length should be ignored.
+ *
+ * @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX
+ * @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path)
+ * @see #getLogicalLength(FileSystem, FileStatus)
+ */
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
public static final PathFilter deltaFileFilter = new PathFilter() {
@Override
@@ -167,7 +181,7 @@ public class AcidUtils {
* This is format of delete delta dir name prior to Hive 2.2.x
*/
@VisibleForTesting
- static String deleteDeltaSubdir(long min, long max) {
+ public static String deleteDeltaSubdir(long min, long max) {
return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
String.format(DELTA_DIGITS, max);
}
@@ -178,7 +192,7 @@ public class AcidUtils {
* @since 2.2.x
*/
@VisibleForTesting
- static String deleteDeltaSubdir(long min, long max, int statementId) {
+ public static String deleteDeltaSubdir(long min, long max, int statementId) {
return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
}
@@ -371,21 +385,10 @@ public class AcidUtils {
public static final int HASH_BASED_MERGE_BIT = 0x02;
public static final String HASH_BASED_MERGE_STRING = "hash_merge";
public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
- public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
private AcidOperationalProperties() {
}
- /**
- * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
- * that were created before ACID type system using operational properties was put in place.
- * @return the acidOperationalProperties object
- */
- public static AcidOperationalProperties getLegacy() {
- AcidOperationalProperties obj = new AcidOperationalProperties();
- // In legacy mode, none of these properties are turned on.
- return obj;
- }
/**
* Returns an acidOperationalProperties object that represents default ACID behavior for tables
@@ -406,14 +409,11 @@ public class AcidUtils {
*/
public static AcidOperationalProperties parseString(String propertiesStr) {
if (propertiesStr == null) {
- return AcidOperationalProperties.getLegacy();
+ return AcidOperationalProperties.getDefault();
}
if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
return AcidOperationalProperties.getDefault();
}
- if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
- return AcidOperationalProperties.getLegacy();
- }
AcidOperationalProperties obj = new AcidOperationalProperties();
String[] options = propertiesStr.split("\\|");
for (String option : options) {
@@ -1119,7 +1119,12 @@ public class AcidUtils {
public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
}
-
+ /**
+ * @param p - not null
+ */
+ public static boolean isDeleteDelta(Path p) {
+ return p.getName().startsWith(DELETE_DELTA_PREFIX);
+ }
/** Checks if a table is a valid ACID table.
* Note, users are responsible for using the correct TxnManager. We do not look at
* SessionState.get().getTxnMgr().supportsAcid() here
@@ -1171,8 +1176,8 @@ public class AcidUtils {
String transactionalProperties = table.getProperty(
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactionalProperties == null) {
- // If the table does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the table does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(transactionalProperties);
}
@@ -1184,7 +1189,7 @@ public class AcidUtils {
*/
public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
// If the conf does not define any transactional properties, the parseInt() should receive
- // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+ // a value of 1, which will set AcidOperationalProperties to a default type and return that.
return AcidOperationalProperties.parseInt(
HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
}
@@ -1197,8 +1202,8 @@ public class AcidUtils {
public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
- // If the properties does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the properties does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
@@ -1212,9 +1217,44 @@ public class AcidUtils {
Map<String, String> parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
- // If the parameters does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the parameters does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
+ /**
+ * See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}.
+ *
+ * Returns the logical end of file for an acid data file.
+ *
+ * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
+ * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all.
+ * @param file - data file to read/compute splits on
+ */
+ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
+ Path lengths = OrcAcidUtils.getSideFile(file.getPath());
+ if(!fs.exists(lengths)) {
+ /**
+ * if here for delta_x_y that means txn y is resolved and all files in this delta are closed so
+ * they should all have a valid ORC footer and info from NameNode about length is good
+ */
+ return file.getLen();
+ }
+ long len = OrcAcidUtils.getLastFlushLength(fs, file.getPath());
+ if(len >= 0) {
+ /**
+ * if here something is still writing to delta_x_y so read only as far as the last commit,
+ * i.e. where last footer was written. The file may contain more data after 'len' position
+ * belonging to an open txn.
+ */
+ return len;
+ }
+ /**
+ * if here, side file is there but we couldn't read it. We want to avoid a read where
+ * (file.getLen() < 'value from side file' which may happen if file is not closed) because this
+ * means some committed data from 'file' would be skipped.
+ * This should be very unusual.
+ */
+ throw new IOException(lengths + " found but is not readable. Consider waiting or orcfiledump --recover");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index de49fc8..17f3d02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -641,6 +642,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
// & therefore we should be able to retrieve them here and determine appropriate behavior.
// Note that this will be meaningless for non-acid tables & will be set to null.
+ //this is set by Utilities.copyTablePropertiesToConf()
boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
this.acidOperationalProperties = isTableTransactional ?
@@ -972,16 +974,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<OrcSplit> splits = Lists.newArrayList();
for (HdfsFileStatusWithId file : fileStatuses) {
FileStatus fileStatus = file.getFileStatus();
- if (fileStatus.getLen() != 0) {
+ long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus);
+ if (logicalLen != 0) {
Object fileKey = file.getFileId();
if (fileKey == null && allowSyntheticFileIds) {
fileKey = new SyntheticFileId(fileStatus);
}
TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
+ if(entry.getKey() + entry.getValue().getLength() > logicalLen) {
+ //don't create splits for anything past logical EOF
+ continue;
+ }
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
- deltas, -1, fileStatus.getLen());
+ deltas, -1, logicalLen);
splits.add(orcSplit);
}
}
@@ -1002,13 +1009,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* ACID split strategy is used when there is no base directory (when transactions are enabled).
*/
static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
- Path dir;
- List<DeltaMetaData> deltas;
- boolean[] covered;
- int numBuckets;
- AcidOperationalProperties acidOperationalProperties;
+ private Path dir;
+ private List<DeltaMetaData> deltas;
+ private boolean[] covered;
+ private int numBuckets;
+ private AcidOperationalProperties acidOperationalProperties;
- public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+ ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
AcidOperationalProperties acidOperationalProperties) {
this.dir = dir;
this.numBuckets = numBuckets;
@@ -1027,18 +1034,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// with valid user payload data has already been considered as base for the covered buckets.
// Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
- return splits; // return an empty list.
+ return Collections.emptyList();
}
// Generate a split for any buckets that weren't covered.
// This happens in the case where a bucket just has deltas and no
// base.
if (!deltas.isEmpty()) {
- for (int b = 0; b < numBuckets; ++b) {
- if (!covered[b]) {
- splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1));
- }
- }
+ //since HIVE-17089 if here, then it's not an acid table so there should never be any deltas
+ throw new IllegalStateException("Found unexpected deltas: " + deltas + " in " + dir);
}
return splits;
}
@@ -1133,7 +1137,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
- SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
}
@@ -1149,7 +1153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
// Fall back to regular API and create statuses without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
for (FileStatus child : children) {
HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
@@ -1402,7 +1406,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
populateAndCacheStripeDetails();
boolean[] includeStripe = null;
// We can't eliminate stripes if there are deltas because the
- // deltas may change the rows making them match the predicate.
+ // deltas may change the rows making them match the predicate. todo: See HIVE-14516.
if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
String[] colNames =
extractNeededColNames((readerTypes == null ? fileTypes : readerTypes),
@@ -1516,7 +1520,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Reader orcReader = OrcFile.createReader(file.getPath(),
OrcFile.readerOptions(context.conf)
.filesystem(fs)
- .maxLength(file.getLen()));
+ .maxLength(AcidUtils.getLogicalLength(fs, file)));
orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(),
file.getModificationTime());
if (context.cacheStripeDetails) {
@@ -2210,7 +2214,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
totalFileSize += child.getFileStatus().getLen();
AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
(child.getFileStatus().getPath(), context.conf);
- opts.writingBase(true);
int b = opts.getBucketId();
// If the bucket is in the valid range, mark it as covered.
// I wish Hive actually enforced bucketing all of the time.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index a179300..214f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -296,7 +296,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
.rowIndexStride(0);
}
final OrcRecordUpdater.KeyIndexBuilder watcher =
- new OrcRecordUpdater.KeyIndexBuilder();
+ new OrcRecordUpdater.KeyIndexBuilder("compactor");
opts.inspector(options.getInspector())
.callback(watcher);
final Writer writer = OrcFile.createWriter(filename, opts);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 814782a..97c4e3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.orc.OrcUtils;
import org.apache.orc.StripeInformation;
@@ -310,11 +311,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OrcStruct nextRecord;
private final ReaderKey key;
final int bucketId;
+ final int bucketProperty;
- OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+ OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
this.key = key;
this.bucketId = bucketId;
assert bucketId >= 0 : "don't support non-bucketed tables yet";
+ this.bucketProperty = encodeBucketId(conf, bucketId);
}
@Override public final OrcStruct nextRecord() {
return nextRecord;
@@ -348,7 +351,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
new LongWritable(0));
nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
- new IntWritable(bucketId));
+ new IntWritable(bucketProperty));
nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
new LongWritable(nextRowId));
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
@@ -360,7 +363,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
.set(0);
((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
- .set(bucketId);
+ .set(bucketProperty);
((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
.set(0);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
@@ -368,7 +371,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
getRecordReader().next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucketId, nextRowId, 0L, 0);
+ key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -380,6 +383,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return false;//reached EndOfFile
}
}
+ static int encodeBucketId(Configuration conf, int bucketId) {
+ return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+ }
@VisibleForTesting
final static class OriginalReaderPairToRead extends OriginalReaderPair {
private final long rowIdOffset;
@@ -392,7 +398,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
final RecordIdentifier minKey, final RecordIdentifier maxKey,
Reader.Options options, Options mergerOptions, Configuration conf,
ValidTxnList validTxnList) throws IOException {
- super(key, bucketId);
+ super(key, bucketId, conf);
this.reader = reader;
assert !mergerOptions.isCompacting();
assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -444,8 +450,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (rowIdOffset > 0) {
//rowIdOffset could be 0 if all files before current one are empty
/**
- * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
- * int, Reader.Options)} need to fix min/max key since these are used by
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+ * need to fix min/max key since these are used by
* {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
* the key. Clear? */
if (minKey != null) {
@@ -455,7 +461,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* If this is not the 1st file, set minKey 1 less than the start of current file
* (Would not need to set minKey if we knew that there are no delta files)
* {@link #advanceToMinKey()} needs this */
- newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+ newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
}
if (maxKey != null) {
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -485,7 +491,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* of the file so we want to leave it blank to make sure any insert events in delta
* files are included; Conversely, if it's not the last file, set the maxKey so that
* events from deltas that don't modify anything in the current split are excluded*/
- newMaxKey = new RecordIdentifier(0, bucketId,
+ newMaxKey = new RecordIdentifier(0, bucketProperty,
rowIdOffset + reader.getNumberOfRows() - 1);
}
this.minKey = newMinKey;
@@ -536,7 +542,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OriginalReaderPairToCompact(ReaderKey key, int bucketId,
Reader.Options options, Options mergerOptions, Configuration conf,
ValidTxnList validTxnList) throws IOException {
- super(key, bucketId);
+ super(key, bucketId, conf);
assert mergerOptions.isCompacting() : "Should only be used for Compaction";
this.conf = conf;
this.options = options;
@@ -651,8 +657,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @throws IOException
*/
private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
- Reader.Options options
- ) throws IOException {
+ Reader.Options options,
+ Configuration conf) throws IOException {
long rowLength = 0;
long rowOffset = 0;
long offset = options.getOffset();//this would usually be at block boundary
@@ -660,6 +666,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isTail = true;
RecordIdentifier minKey = null;
RecordIdentifier maxKey = null;
+ int bucketProperty = encodeBucketId(conf, bucket);
/**
* options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
* necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st
@@ -679,10 +686,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
if (rowOffset > 0) {
- minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+ minKey = new RecordIdentifier(0, bucketProperty, rowOffset - 1);
}
if (!isTail) {
- maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+ maxKey = new RecordIdentifier(0, bucketProperty, rowOffset + rowLength - 1);
}
return new KeyInterval(minKey, maxKey);
}
@@ -829,7 +836,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
KeyInterval keyInterval;
// find the min/max based on the offset and length (and more for 'original')
if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
} else {
keyInterval = discoverKeyBounds(reader, options);
}
@@ -865,6 +872,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
eventOptions.range(0, Long.MAX_VALUE);
if (deltaDirectory != null) {
for(Path delta: deltaDirectory) {
+ if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
+ //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
+ throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
+ }
ReaderKey key = new ReaderKey();
Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index c30e8fe..429960b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -90,6 +90,7 @@ public class OrcRecordUpdater implements RecordUpdater {
private Path deleteEventPath;
private final FileSystem fs;
private OrcFile.WriterOptions writerOptions;
+ private OrcFile.WriterOptions deleteWriterOptions;
private Writer writer = null;
private boolean writerClosed = false;
private Writer deleteEventWriter = null;
@@ -104,7 +105,7 @@ public class OrcRecordUpdater implements RecordUpdater {
// This records how many rows have been inserted or deleted. It is separate from insertedRows
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
- private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert");
private KeyIndexBuilder deleteEventIndexBuilder;
private StructField recIdField = null; // field to look for the record identifier in
private StructField rowIdField = null; // field inside recId to look for row id in
@@ -148,20 +149,23 @@ public class OrcRecordUpdater implements RecordUpdater {
/**
* An extension to AcidOutputFormat that allows users to add additional
* options.
+ *
+ * todo: since this is only used for testing could we not control the writer some other way?
+ * to simplify {@link #OrcRecordUpdater(Path, AcidOutputFormat.Options)}
*/
- public static class OrcOptions extends AcidOutputFormat.Options {
+ final static class OrcOptions extends AcidOutputFormat.Options {
OrcFile.WriterOptions orcOptions = null;
- public OrcOptions(Configuration conf) {
+ OrcOptions(Configuration conf) {
super(conf);
}
- public OrcOptions orcOptions(OrcFile.WriterOptions opts) {
+ OrcOptions orcOptions(OrcFile.WriterOptions opts) {
this.orcOptions = opts;
return this;
}
- public OrcFile.WriterOptions getOrcOptions() {
+ OrcFile.WriterOptions getOrcOptions() {
return orcOptions;
}
}
@@ -205,6 +209,7 @@ public class OrcRecordUpdater implements RecordUpdater {
this.acidOperationalProperties =
AcidUtils.getAcidOperationalProperties(options.getConfiguration());
}
+ assert this.acidOperationalProperties.isSplitUpdate() : "HIVE-17089?!";
BucketCodec bucketCodec = BucketCodec.V1;
if(options.getConfiguration() != null) {
//so that we can test "old" files
@@ -240,6 +245,8 @@ public class OrcRecordUpdater implements RecordUpdater {
&& !options.isWritingBase()){
flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
options.getReporter());
+ flushLengths.writeLong(0);
+ OrcInputFormat.SHIMS.hflush(flushLengths);
} else {
flushLengths = null;
}
@@ -265,12 +272,26 @@ public class OrcRecordUpdater implements RecordUpdater {
optionsCloneForDelta.getConfiguration());
}
if (this.acidOperationalProperties.isSplitUpdate()) {
+ AcidOutputFormat.Options deleteOptions = options.clone().writingDeleteDelta(true);
// If this is a split-update, we initialize a delete delta file path in anticipation that
// they would write update/delete events to that separate file.
// This writes to a file in directory which starts with "delete_delta_..."
- // The actual initialization of a writer only happens if any delete events are written.
- this.deleteEventPath = AcidUtils.createFilename(path,
- optionsCloneForDelta.writingDeleteDelta(true));
+ // The actual initialization of a writer only happens if any delete events are written
+ //to avoid empty files.
+ this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions);
+ /**
+ * HIVE-14514 is not done so we can't clone writerOptions(). So here we create a new
+ * options object to make sure insert and delete writers don't share them (like the
+ * callback object, for example)
+ * In any case insert writer and delete writer would most likely have very different
+ * characteristics - delete writer only writes a tiny amount of data. Once we do early
+ * update split, each {@link OrcRecordUpdater} will have only 1 writer. (except for Mutate API)
+ * Then it would perhaps make sense to take writerOptions as input - how?.
+ */
+ this.deleteWriterOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(),
+ optionsCloneForDelta.getConfiguration());
+ this.deleteWriterOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+ options.getRecordIdColumn())));
}
// get buffer size and stripe size for base writer
@@ -377,19 +398,10 @@ public class OrcRecordUpdater implements RecordUpdater {
recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
// Initialize a deleteEventWriter if not yet done. (Lazy initialization)
if (deleteEventWriter == null) {
- // Initialize an indexBuilder for deleteEvents.
- deleteEventIndexBuilder = new KeyIndexBuilder();
- // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
- // options remain the same.
-
- // TODO: When we change the callback, we are essentially mutating the writerOptions.
- // This works but perhaps is not a good thing. The proper way to do this would be
- // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
- // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
- // JIRA to fix this.
-
+ // Initialize an indexBuilder for deleteEvents. (HIVE-17284)
+ deleteEventIndexBuilder = new KeyIndexBuilder("delete");
this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
- writerOptions.callback(deleteEventIndexBuilder));
+ deleteWriterOptions.callback(deleteEventIndexBuilder));
}
// A delete/update generates a delete event for the original row.
@@ -461,6 +473,8 @@ public class OrcRecordUpdater implements RecordUpdater {
long len = writer.writeIntermediateFooter();
flushLengths.writeLong(len);
OrcInputFormat.SHIMS.hflush(flushLengths);
+ //multiple transactions only happen for streaming ingest which only allows inserts
+ assert deleteEventWriter == null : "unexpected delete writer for " + path;
}
@Override
@@ -539,12 +553,16 @@ public class OrcRecordUpdater implements RecordUpdater {
}
static class KeyIndexBuilder implements OrcFile.WriterCallback {
- StringBuilder lastKey = new StringBuilder();
+ private final String builderName;
+ StringBuilder lastKey = new StringBuilder();//list of last keys for each stripe
long lastTransaction;
int lastBucket;
long lastRowId;
AcidStats acidStats = new AcidStats();
+ KeyIndexBuilder(String name) {
+ this.builderName = name;
+ }
@Override
public void preStripeWrite(OrcFile.WriterContext context
) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c50c1a8..bff9884 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -859,15 +861,19 @@ public class TestTxnCommands {
for(String s : rs) {
LOG.warn(s);
}
+ Assert.assertEquals(536870912,
+ BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+ Assert.assertEquals(536936448,
+ BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
@@ -877,11 +883,11 @@ public class TestTxnCommands {
LOG.warn(s);
}
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));