You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/16 01:14:43 UTC
[1/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene
Koifman, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 506168370 -> 34b0e07a3
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index ba8d675..82cf108 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -18,11 +18,20 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.MemoryManager;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.MemoryManagerImpl;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -58,7 +67,10 @@ import org.mockito.Mockito;
import com.google.common.collect.Lists;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -283,28 +295,30 @@ public class TestOrcRawRecordMerger {
@Test
public void testOriginalReaderPair() throws Exception {
+ int BUCKET = 10;
ReaderKey key = new ReaderKey();
+ Configuration conf = new Configuration();
+ int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
Reader reader = createMockOriginalReader();
- RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
- RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+ RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1);
+ RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3);
boolean[] includes = new boolean[]{true, true};
- Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path root = new Path(tmpDir, "testOriginalReaderPair");
fs.makeQualified(root);
fs.create(root);
- ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey,
+ ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey,
new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
RecordReader recordReader = pair.getRecordReader();
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(2, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
assertEquals("third", value(pair.nextRecord()));
pair.next(pair.nextRecord());
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(3, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
assertEquals("fourth", value(pair.nextRecord()));
@@ -320,46 +334,48 @@ public class TestOrcRawRecordMerger {
@Test
public void testOriginalReaderPairNoMin() throws Exception {
+ int BUCKET = 10;
ReaderKey key = new ReaderKey();
Reader reader = createMockOriginalReader();
Configuration conf = new Configuration();
+ int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
FileSystem fs = FileSystem.getLocal(conf);
Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
fs.makeQualified(root);
fs.create(root);
- ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null,
+ ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null,
new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
assertEquals("first", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(0, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
pair.next(pair.nextRecord());
assertEquals("second", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(1, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
pair.next(pair.nextRecord());
assertEquals("third", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(2, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
pair.next(pair.nextRecord());
assertEquals("fourth", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(3, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
pair.next(pair.nextRecord());
assertEquals("fifth", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
- assertEquals(10, key.getBucketProperty());
+ assertEquals(bucketProperty, key.getBucketProperty());
assertEquals(4, key.getRowId());
assertEquals(0, key.getCurrentTransactionId());
@@ -506,6 +522,53 @@ public class TestOrcRawRecordMerger {
return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
}
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ /**
+ * {@link org.apache.hive.hcatalog.streaming.TestStreaming#testInterleavedTransactionBatchCommits} has more tests
+ */
+ @Test
+ public void testGetLogicalLength() throws Exception {
+ final int BUCKET = 0;
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ OrcOutputFormat of = new OrcOutputFormat();
+ Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
+ fs.delete(root, true);
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ /*create delta_1_1_0/bucket0 with 1 row and close the file*/
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .inspector(inspector).bucket(BUCKET).writingBase(false).minimumTransactionId(1)
+ .maximumTransactionId(1).finalDestination(root);
+ Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir(
+ options.getMinimumTransactionId(), options.getMaximumTransactionId(), options.getStatementId()));
+ Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET);
+ Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0);
+
+ RecordUpdater ru = of.getRecordUpdater(root, options);
+ ru.insert(options.getMaximumTransactionId(), new MyRow("first"));
+ ru.close(false);
+
+ FileStatus bucket0File = fs.getFileStatus(bucket0);
+ AcidUtils.getLogicalLength(fs, bucket0File);
+ Assert.assertTrue("no " + bucket0, fs.exists(bucket0));
+ Assert.assertFalse("unexpected " + bucket0SideFile, fs.exists(bucket0SideFile));
+ //test getLogicalLength() w/o side file
+ Assert.assertEquals("closed file size mismatch", bucket0File.getLen(),
+ AcidUtils.getLogicalLength(fs, bucket0File));
+
+ //create an empty (invalid) side file - make sure getLogicalLength() throws
+ FSDataOutputStream flushLengths = fs.create(bucket0SideFile, true, 8);
+ flushLengths.close();
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage(bucket0SideFile.getName() + " found but is not readable");
+ AcidUtils.getLogicalLength(fs, bucket0File);
+ }
@Test
public void testEmpty() throws Exception {
final int BUCKET = 0;
@@ -525,7 +588,16 @@ public class TestOrcRawRecordMerger {
.inspector(inspector).bucket(BUCKET).writingBase(true)
.maximumTransactionId(100).finalDestination(root);
of.getRecordUpdater(root, options).close(false);
-
+ {
+ /*OrcRecordUpdater is inconsistent about when it creates empty files and when it does not.
+ This creates an empty bucket. HIVE-17138*/
+ OrcFile.WriterOptions wo = OrcFile.writerOptions(conf);
+ wo.inspector(inspector);
+ wo.callback(new OrcRecordUpdater.KeyIndexBuilder("testEmpty"));
+ Writer w = OrcFile.createWriter(AcidUtils.createBucketFile(new Path(root,
+ AcidUtils.baseDir(100)), BUCKET), wo);
+ w.close();
+ }
ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
@@ -600,52 +672,58 @@ public class TestOrcRawRecordMerger {
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
assertEquals(new Path(root, use130Format ?
- AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+ AcidUtils.deleteDeltaSubdir(200,200,0) : AcidUtils.deleteDeltaSubdir(200,200)),
directory.getCurrentDirectories().get(0).getPath());
+ assertEquals(new Path(root, use130Format ?
+ AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+ directory.getCurrentDirectories().get(1).getPath());
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
BUCKET);
+ Path deltaPath = AcidUtils.createBucketFile(directory.getCurrentDirectories().get(1).getPath(),
+ BUCKET);
+ Path deleteDeltaDir = directory.getCurrentDirectories().get(0).getPath();
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+ AcidUtils.setTransactionalTableScan(conf,true);
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ //the first "split" is for base/
Reader baseReader = OrcFile.createReader(basePath,
OrcFile.readerOptions(conf));
OrcRawRecordMerger merger =
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+ new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
assertEquals(null, merger.getMinKey());
assertEquals(null, merger.getMaxKey());
RecordIdentifier id = merger.createKey();
OrcStruct event = merger.createValue();
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
- assertEquals("update 1", getValue(event));
- assertFalse(merger.isDelete(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
assertEquals("second", getValue(event));
- assertFalse(merger.isDelete(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertEquals("update 2", getValue(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertEquals("update 3", getValue(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -667,14 +745,13 @@ public class TestOrcRawRecordMerger {
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
- assertTrue(merger.isDelete(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
@@ -687,109 +764,205 @@ public class TestOrcRawRecordMerger {
assertEquals(false, merger.next(id, event));
merger.close();
- // make a merger that doesn't collapse events
- merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
- createMaximalTxnList(), new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+ //second "split" is delta_200_200
+ baseReader = OrcFile.createReader(deltaPath,
+ OrcFile.readerOptions(conf));
+ merger =
+ new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
assertEquals("update 1", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+ assertEquals("update 2", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+ assertEquals("update 3", getValue(event));
+
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+
+ //now run as if it's a minor Compaction so we don't collapse events
+ //it's not exactly like minor compaction since MC would not have a baseReader
+ //here there is only 1 "split" since we only have data for 1 bucket
+ baseReader = OrcFile.createReader(basePath,
+ OrcFile.readerOptions(conf));
+ merger =
+ new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+ createMaximalTxnList(), new Reader.Options(),
+ AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+ assertNull(OrcRecordUpdater.getRow(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
assertEquals("first", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
assertEquals("second", getValue(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertEquals("update 2", getValue(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
assertEquals("third", getValue(event));
assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertEquals("update 3", getValue(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
assertEquals("fourth", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
assertEquals("fifth", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
assertEquals("sixth", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
assertEquals("seventh", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
assertEquals("eighth", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
+
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
assertEquals("ninth", getValue(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
+ OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
assertEquals("tenth", getValue(event));
+ //data from delta_200_200
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
+ assertEquals("update 1", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+ assertEquals("update 2", getValue(event));
+
+ assertEquals(true, merger.next(id, event));
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+ OrcRecordUpdater.getOperation(event));
+ assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+ assertEquals("update 3", getValue(event));
+
assertEquals(false, merger.next(id, event));
merger.close();
+
// try ignoring the 200 transaction and make sure it works still
ValidTxnList txns = new ValidReadTxnList("2000:200:200");
+ //again 1st split is for base/
+ baseReader = OrcFile.createReader(basePath,
+ OrcFile.readerOptions(conf));
merger =
- new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
- txns, new Reader.Options(),
- AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+ new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+ txns, new Reader.Options(),
+ new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
+
for(int i=0; i < values.length; ++i) {
assertEquals(true, merger.next(id, event));
LOG.info("id = " + id + "event = " + event);
@@ -798,7 +971,19 @@ public class TestOrcRawRecordMerger {
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id);
assertEquals(values[i], getValue(event));
}
+ assertEquals(false, merger.next(id, event));
+ merger.close();
+
+ // 2nd split is for delta_200_200 which is filtered out entirely by "txns"
+ baseReader = OrcFile.createReader(deltaPath,
+ OrcFile.readerOptions(conf));
+ merger =
+ new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+ txns, new Reader.Options(),
+ new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+ assertEquals(null, merger.getMinKey());
+ assertEquals(null, merger.getMaxKey());
assertEquals(false, merger.next(id, event));
merger.close();
}
@@ -846,6 +1031,7 @@ public class TestOrcRawRecordMerger {
* Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
* a base and a delta.
* @throws Exception
+ * @see #testRecordReaderNewBaseAndDelta()
*/
@Test
public void testRecordReaderOldBaseAndDelta() throws Exception {
@@ -891,63 +1077,90 @@ public class TestOrcRawRecordMerger {
.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
.bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5)
.finalDestination(root);
+
+ final int BUCKET_PROPERTY = BucketCodec.V1.encode(options);
+
RecordUpdater ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
for(int i=0; i < values.length; ++i) {
if (values[i] != null) {
- ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+ ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
}
}
- ru.delete(100, new BigRow(9, 0, BUCKET));
- ru.close(false);
+ ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
+ ru.close(false);//this doesn't create a key index presumably because writerOptions are not set on 'options'
// write a delta
- options = options.minimumTransactionId(2).maximumTransactionId(2);
+ options = options.minimumTransactionId(100).maximumTransactionId(100);
ru = of.getRecordUpdater(root, options);
values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
- for(int i=0; i < values.length; ++i) {
+ for(int i=0; i < values.length - 1; ++i) {
if (values[i] != null) {
- ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+ ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
}
}
- ru.delete(100, new BigRow(8, 0, BUCKET));
+ //do this before next update so that delte_delta is properly sorted
+ ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+ //because row 8 was updated and thus has a different RecordIdentifier now
+ ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
+
ru.close(false);
+ MyResult[] expected = new MyResult[10];
+ int k = 0;
+ expected[k++] = new MyResult(0, "0.0");
+ expected[k++] = new MyResult(1, "0.1");
+ expected[k++] = new MyResult(2, "1.0");
+ expected[k++] = new MyResult(3, "1.1");
+ expected[k++] = new MyResult(4, "2.0");
+ expected[k++] = new MyResult(5, "2.1");
+ expected[k++] = new MyResult(6, "3.0");
+ expected[k] = new MyResult(7, "3.1");
+
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set("mapred.min.split.size", "1");
job.set("mapred.max.split.size", "2");
job.set("mapred.input.dir", root.toString());
InputSplit[] splits = inf.getSplits(job, 5);
- assertEquals(5, splits.length);
+ assertEquals(7, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
- // loop through the 5 splits and read each
- for(int i=0; i < 4; ++i) {
- System.out.println("starting split " + i);
- rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+ for(InputSplit split : splits) {
+ rr = inf.getRecordReader(split, job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
-
- // there should be exactly two rows per a split
- for(int j=0; j < 2; ++j) {
- System.out.println("i = " + i + ", j = " + j);
- assertEquals(true, rr.next(key, value));
- System.out.println("record = " + value);
- assertEquals(i + "." + j, value.getFieldValue(2).toString());
+ while(rr.next(key, value)) {
+ MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+ int i = 0;
+ for(; i < expected.length; i++) {
+ if(mr.equals(expected[i])) {
+ expected[i] = null;
+ break;
+ }
+ }
+ if(i >= expected.length) {
+ //not found
+ assertTrue("Found unexpected row: " + mr, false );
+ }
}
- assertEquals(false, rr.next(key, value));
}
- rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
- assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+ for(MyResult mr : expected) {
+ assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+ }
}
/**
* Test the RecordReader when there is a new base and a delta.
+ * This test creates multiple stripes in both base and delta files which affects how many splits
+ * are created on read. With ORC-228 this could be done in E2E fashion with a query or
+ * streaming ingest writing data.
+ * @see #testRecordReaderOldBaseAndDelta()
* @throws Exception
*/
@Test
@@ -1009,20 +1222,33 @@ public class TestOrcRawRecordMerger {
ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
}
}
- ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY));
+ ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
ru.close(false);
// write a delta
- options.minimumTransactionId(2).maximumTransactionId(2);
+ options.minimumTransactionId(100).maximumTransactionId(100);
ru = of.getRecordUpdater(root, options);
values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
- for(int i=0; i < values.length; ++i) {
+ for(int i=0; i < values.length - 1; ++i) {
if (values[i] != null) {
- ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
+ ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
}
}
+ //do this before next update so that delte_delta is properly sorted
ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+ //because row 8 was updated and thus has a different RecordIdentifier now
+ ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
ru.close(false);
+ MyResult[] expected = new MyResult[10];
+ int k = 0;
+ expected[k++] = new MyResult(0, "0.0");
+ expected[k++] = new MyResult(1, "0.1");
+ expected[k++] = new MyResult(2, "1.0");
+ expected[k++] = new MyResult(3, "1.1");
+ expected[k++] = new MyResult(4, "2.0");
+ expected[k++] = new MyResult(5, "2.1");
+ expected[k++] = new MyResult(6, "3.0");
+ expected[k] = new MyResult(7, "3.1");
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
@@ -1031,31 +1257,56 @@ public class TestOrcRawRecordMerger {
job.set("mapred.input.dir", root.toString());
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
InputSplit[] splits = inf.getSplits(job, 5);
- assertEquals(5, splits.length);
+ //base has 10 rows, so 5 splits, 1 delta has 2 rows so 1 split, and 1 delta has 3 so 2 splits
+ assertEquals(8, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
- // loop through the 5 splits and read each
- for(int i=0; i < 4; ++i) {
- System.out.println("starting split " + i + " = " + splits[i]);
- rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+ for(InputSplit split : splits) {
+ rr = inf.getRecordReader(split, job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
-
- // there should be exactly two rows per a split
- for(int j=0; j < 2; ++j) {
- System.out.println("i = " + i + ", j = " + j);
- assertEquals(true, rr.next(key, value));
- System.out.println("record = " + value);
- assertEquals(i + "." + j, value.getFieldValue(2).toString());
+ while(rr.next(key, value)) {
+ MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+ int i = 0;
+ for(; i < expected.length; i++) {
+ if(mr.equals(expected[i])) {
+ expected[i] = null;
+ break;
+ }
+ }
+ if(i >= expected.length) {
+ //not found
+ assertTrue("Found unexpected row: " + mr, false );
+ }
}
- assertEquals(false, rr.next(key, value));
}
- rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
- assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+ for(MyResult mr : expected) {
+ assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+ }
+ }
+ private static final class MyResult {
+ private final int myInt;
+ private final String myText;
+ MyResult(int myInt, String myText) {
+ this.myInt = myInt;
+ this.myText = myText;
+ }
+ @Override
+ public boolean equals(Object t) {
+ if(!(t instanceof MyResult)) {
+ return false;
+ }
+ MyResult that = (MyResult)t;
+ return myInt == that.myInt && myText.equals(that.myText);
+ }
+ @Override
+ public String toString() {
+ return "(" + myInt + "," + myText +")";
+ }
}
-
/**
* Test the RecordReader when there is a new base and a delta.
* @throws Exception
@@ -1081,18 +1332,17 @@ public class TestOrcRawRecordMerger {
.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
.finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
- String[] values = new String[]{"a", "b", "c", "d", "e"};
- for(int i=0; i < values.length; ++i) {
- ru.insert(1, new MyRow(values[i]));
+ String[][] values = {new String[]{"a", "b", "c", "d", "e"}, new String[]{"f", "g", "h", "i", "j"}};
+ for(int i=0; i < values[0].length; ++i) {
+ ru.insert(1, new MyRow(values[0][i]));
}
ru.close(false);
// write a delta
options.minimumTransactionId(2).maximumTransactionId(2);
ru = of.getRecordUpdater(root, options);
- values = new String[]{"f", "g", "h", "i", "j"};
- for(int i=0; i < values.length; ++i) {
- ru.insert(2, new MyRow(values[i]));
+ for(int i=0; i < values[1].length; ++i) {
+ ru.insert(2, new MyRow(values[1][i]));
}
ru.close(false);
@@ -1104,19 +1354,23 @@ public class TestOrcRawRecordMerger {
job.set("bucket_count", "1");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
InputSplit[] splits = inf.getSplits(job, 5);
- assertEquals(1, splits.length);
+ assertEquals(2, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
- rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
- values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
- OrcStruct row = rr.createValue();
- for(int i = 0; i < values.length; ++i) {
- System.out.println("Checking " + i);
- assertEquals(true, rr.next(NullWritable.get(), row));
- assertEquals(values[i], row.getFieldValue(0).toString());
+ for(int j = 0; j < splits.length; j++) {
+ InputSplit split = splits[j];
+ rr = inf.getRecordReader(split, job, Reporter.NULL);
+ OrcStruct row = rr.createValue();
+ for (int i = 0; i < values[j].length; ++i) {
+ System.out.println("Checking " + i);
+ String msg = "split[" + j + "] at i=" + i;
+ assertEquals(msg, true, rr.next(NullWritable.get(), row));
+ assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
+ }
+ assertEquals(false, rr.next(NullWritable.get(), row));
}
- assertEquals(false, rr.next(NullWritable.get(), row));
}
/**
@@ -1174,11 +1428,13 @@ public class TestOrcRawRecordMerger {
job.set("bucket_count", "2");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
// read the keys before the delta is flushed
InputSplit[] splits = inf.getSplits(job, 1);
- assertEquals(2, splits.length);
+ //1 split since we only have 1 bucket file in base/. delta is not flushed (committed) yet, i.e. empty
+ assertEquals(1, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
NullWritable key = rr.createKey();
@@ -1201,17 +1457,24 @@ public class TestOrcRawRecordMerger {
splits = inf.getSplits(job, 1);
assertEquals(2, splits.length);
- rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
assertEquals(true, fs.exists(sideFile));
- assertEquals(24, fs.getFileStatus(sideFile).getLen());
+ assertEquals(32, fs.getFileStatus(sideFile).getLen());
- for(int i=1; i < 11; ++i) {
+ rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+ for(int i=1; i <= 5; ++i) {
assertEquals(true, rr.next(key, value));
assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
}
assertEquals(false, rr.next(key, value));
+
+ rr = inf.getRecordReader(splits[1], job, Reporter.NULL);
+ for(int i=6; i < 11; ++i) {
+ assertEquals("i="+ i, true, rr.next(key, value));
+ assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+ }
+ assertEquals(false, rr.next(key, value));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index be15517..e5c6458 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -125,6 +125,7 @@ public class TestOrcRecordUpdater {
// read the stopping point for the first flush and make sure we only see
// 3 rows
long len = side.readLong();
+ len = side.readLong();
Reader reader = OrcFile.createReader(bucketPath,
new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
assertEquals(3, reader.getNumberOfRows());
@@ -266,28 +267,51 @@ public class TestOrcRecordUpdater {
Reader reader = OrcFile.createReader(bucketPath,
new OrcFile.ReaderOptions(conf).filesystem(fs));
- assertEquals(2, reader.getNumberOfRows());
+ assertEquals(1, reader.getNumberOfRows());
RecordReader rows = reader.rows();
// check the contents of the file
assertEquals(true, rows.hasNext());
OrcStruct row = (OrcStruct) rows.next(null);
- assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+ assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(row));
assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
- assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
- assertEquals(20, OrcRecordUpdater.getBucket(row));
- assertEquals(30, OrcRecordUpdater.getRowId(row));
+ assertEquals(100, OrcRecordUpdater.getOriginalTransaction(row));
+ int bucketProperty = OrcRecordUpdater.getBucket(row);
+ assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+ assertEquals(0, OrcRecordUpdater.getRowId(row));
assertEquals("update",
OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+ rows.close();
+
+ options.writingDeleteDelta(true);
+ bucketPath = AcidUtils.createFilename(root, options);
+ reader = OrcFile.createReader(bucketPath,
+ new OrcFile.ReaderOptions(conf).filesystem(fs));
+ assertEquals(2, reader.getNumberOfRows());
+
+ rows = reader.rows();
assertEquals(true, rows.hasNext());
row = (OrcStruct) rows.next(null);
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
+ assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+ assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
+ bucketProperty = OrcRecordUpdater.getBucket(row);
+ assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+ assertEquals(30, OrcRecordUpdater.getRowId(row));
+ assertNull(OrcRecordUpdater.getRow(row));
+
+ assertEquals(true, rows.hasNext());
+ row = (OrcStruct) rows.next(null);
+ assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
- assertEquals(20, OrcRecordUpdater.getBucket(row));
+ bucketProperty = OrcRecordUpdater.getBucket(row);
+ assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
assertEquals(60, OrcRecordUpdater.getRowId(row));
assertNull(OrcRecordUpdater.getRow(row));
+
assertEquals(false, rows.hasNext());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 439ec9b..43e0a4a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -246,10 +246,6 @@ public class TestVectorizedOrcAcidRowBatchReader {
@Test
public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
- conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
- AcidUtils.AcidOperationalProperties.getLegacy().toInt());
- // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
- assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
AcidUtils.AcidOperationalProperties.getDefault().toInt());
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index bbed591..7455fa8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -409,6 +409,11 @@ public abstract class CompactorTest {
return null;
}
+ /**
+ * This is bogus especially with split update acid tables. This causes compaction to create
+ * delete_delta_x_y where none existed before. Makes the data layout such as would never be
+ * created by 'real' code path.
+ */
@Override
public boolean isDelete(Text value) {
// Alternate between returning deleted and not. This is easier than actually
@@ -552,4 +557,7 @@ public abstract class CompactorTest {
String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
}
+ String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+ return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index efd6ed8..8d01543 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -289,7 +289,7 @@ public class TestWorker extends CompactorTest {
CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
txnHandler.compact(rqst);
- startWorker();
+ startWorker();//adds delta and delete_delta
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -299,7 +299,7 @@ public class TestWorker extends CompactorTest {
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ Assert.assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -310,9 +310,19 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(208L, buckets[0].getLen());
- Assert.assertEquals(208L, buckets[1].getLen());
- } else {
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
+ }
+ if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
+ }
+ else {
LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
}
}
@@ -348,14 +358,15 @@ public class TestWorker extends CompactorTest {
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ Assert.assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+ Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
}
@Test
@@ -380,18 +391,19 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(1, compacts.size());
Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
- // There should still now be 5 directories in the location
+ // There should still now be 6 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ Assert.assertEquals(6, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+ Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), stat[1].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[3].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+ Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
}
@Test
@@ -409,7 +421,7 @@ public class TestWorker extends CompactorTest {
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- startWorker();
+ startWorker();//this will create delta_20_24 and delete_delta_20_24. See MockRawReader
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -419,7 +431,7 @@ public class TestWorker extends CompactorTest {
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ Assert.assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -430,9 +442,18 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(208L, buckets[0].getLen());
- Assert.assertEquals(208L, buckets[1].getLen());
- } else {
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
+ }
+ if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
+ } else {
LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
}
}
@@ -462,7 +483,7 @@ public class TestWorker extends CompactorTest {
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(3, stat.length);
+ Assert.assertEquals(4, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -473,8 +494,17 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(208L, buckets[0].getLen());
- Assert.assertEquals(208L, buckets[1].getLen());
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
+ }
+ if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+ Assert.assertEquals(2, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
} else {
LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
}
@@ -534,6 +564,17 @@ public class TestWorker extends CompactorTest {
public void majorNoBaseLotsOfDeltas() throws Exception {
compactNoBaseLotsOfDeltas(CompactionType.MAJOR);
}
+
+ /**
+ * These tests are starting to be a hack. The files writtern by addDeltaFile() are not proper
+ * Acid files and the {@link CompactorTest.MockRawReader} performs no merging of delta files and
+ * fakes isDelete() as a shortcut. This makes files created on disk to not be representative of
+ * what they should look like in a real system.
+ * Making {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorTest.MockRawReader} do proper
+ * delete event handling would be duplicating either OrcRawRecordMerger or VectorizedOrcAcidRowBatchReaer.
+ * @param type
+ * @throws Exception
+ */
private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception {
conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2);
Table t = newTable("default", "mapwb", true);
@@ -570,45 +611,55 @@ public class TestWorker extends CompactorTest {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(9, stat.length);
+ /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/
+ int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0);
+ Assert.assertEquals(numFilesExpected, stat.length);
// Find the new delta file and make sure it has the right contents
- BitSet matchesFound = new BitSet(9);
- for (int i = 0; i < stat.length; i++) {
+ BitSet matchesFound = new BitSet(numFilesExpected);
+ for (int i = 0, j = 0; i < stat.length; i++) {
+ if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) {
+ matchesFound.set(j++);
+ }
+ if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) {
+ matchesFound.set(j++);
+ }
if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
- matchesFound.set(0);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
- matchesFound.set(1);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) {
- matchesFound.set(2);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) {
- matchesFound.set(3);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) {
- matchesFound.set(4);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
- matchesFound.set(5);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
- matchesFound.set(6);
+ matchesFound.set(j++);
}
else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
- matchesFound.set(7);
+ matchesFound.set(j++);
}
switch (type) {
- //yes, both do set(8)
case MINOR:
if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
- matchesFound.set(8);
+ matchesFound.set(j++);
+ }
+ if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) {
+ matchesFound.set(j++);
}
break;
case MAJOR:
if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
- matchesFound.set(8);
+ matchesFound.set(j++);
}
break;
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
index 18408e4..80ee0ba 100644
--- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
@@ -108,7 +108,7 @@ when matched then update set
end_date = date '2017-03-15'
when not matched then insert values
(sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null);
-select * from customer order by source_pk;
+select * from customer order by source_pk, is_current;
drop table customer;
drop table customer_updates;
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
index 78c9070..7f837cc 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
@@ -981,11 +981,11 @@ POSTHOOK: Lineage: customer.sk EXPRESSION [(new_customer_stage)stage.FieldSchema
POSTHOOK: Lineage: customer.source_pk SIMPLE [(new_customer_stage)stage.FieldSchema(name:source_pk, type:int, comment:null), ]
POSTHOOK: Lineage: customer.state SIMPLE [(new_customer_stage)stage.FieldSchema(name:state, type:string, comment:null), ]
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(customer)customer.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), ]
-PREHOOK: query: select * from customer order by source_pk
+PREHOOK: query: select * from customer order by source_pk, is_current
PREHOOK: type: QUERY
PREHOOK: Input: type2_scd_helper@customer
#### A masked pattern was here ####
-POSTHOOK: query: select * from customer order by source_pk
+POSTHOOK: query: select * from customer order by source_pk, is_current
POSTHOOK: type: QUERY
POSTHOOK: Input: type2_scd_helper@customer
#### A masked pattern was here ####
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/row__id.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out
index 059ace9..9aab097 100644
--- a/ql/src/test/results/clientpositive/row__id.q.out
+++ b/ql/src/test/results/clientpositive/row__id.q.out
@@ -56,23 +56,23 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: hello_acid
- Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: ROW__ID.transactionid (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
- Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -117,17 +117,17 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: hello_acid
- Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: (ROW__ID.transactionid = 3) (type: boolean)
- Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ROW__ID.transactionid (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
[3/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene
Koifman, reviewed by Sergey Shelukhin)
Posted by ek...@apache.org.
HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b0e07a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b0e07a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b0e07a
Branch: refs/heads/master
Commit: 34b0e07a3bd5002b197b749e3e5a7992e196c237
Parents: 5061683
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 15 18:13:44 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 15 18:13:44 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 9 +-
.../hive/hcatalog/streaming/TestStreaming.java | 147 ++++-
.../streaming/mutate/StreamingAssert.java | 45 +-
.../streaming/mutate/TestMutations.java | 17 +-
.../TransactionalValidationListener.java | 2 -
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 -
.../hadoop/hive/ql/io/AcidInputFormat.java | 6 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 90 ++-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 41 +-
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 2 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 41 +-
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 60 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 20 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 186 ++++---
.../ql/TestTxnCommands2WithSplitUpdate.java | 545 -------------------
...ommands2WithSplitUpdateAndVectorization.java | 4 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 31 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 191 ++++---
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 509 ++++++++++++-----
.../hive/ql/io/orc/TestOrcRecordUpdater.java | 36 +-
.../TestVectorizedOrcAcidRowBatchReader.java | 4 -
.../hive/ql/txn/compactor/CompactorTest.java | 8 +
.../hive/ql/txn/compactor/TestWorker.java | 125 +++--
.../dynamic_semijoin_reduction_3.q | 2 +-
.../llap/dynamic_semijoin_reduction_3.q.out | 4 +-
.../test/results/clientpositive/row__id.q.out | 18 +-
26 files changed, 1086 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b154544..3c158a6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1841,12 +1841,9 @@ public class HiveConf extends Configuration {
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
- HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
- "Sets the operational properties that control the appropriate behavior for various\n"
- + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
- + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
- + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
- + "for future versions of ACID. (See HIVE-14035 for details.)"),
+ HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
+ "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+ "HIVE-14035 for details.)"),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 5e8fe62..f3ef92b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -27,6 +27,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -44,7 +45,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -97,6 +99,8 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
public class TestStreaming {
private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
@@ -448,7 +452,10 @@ public class TestStreaming {
}
}
-
+ /**
+ * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
+ * little value in using InputFormat directly
+ */
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -473,13 +480,14 @@ public class TestStreaming {
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
- job.set("bucket_count", Integer.toString(buckets));
+ job.set(BUCKET_COUNT, Integer.toString(buckets));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
- job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inf.getSplits(job, buckets);
- Assert.assertEquals(buckets, splits.length);
+ Assert.assertEquals(numExpectedFiles, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
@@ -491,6 +499,48 @@ public class TestStreaming {
}
Assert.assertEquals(false, rr.next(key, value));
}
+ /**
+ * @param validationQuery query to read from table to compare data against {@code records}
+ * @param records expected data. each row is CVS list of values
+ */
+ private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+ String validationQuery, boolean vectorize, String... records) throws Exception {
+ ValidTxnList txns = msClient.getValidTxns();
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+ Assert.assertEquals(0, dir.getObsolete().size());
+ Assert.assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+ System.out.println("Files found: ");
+ for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+ Assert.assertEquals(numExpectedFiles, current.size());
+
+ // find the absolute minimum transaction
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ for (AcidUtils.ParsedDelta pd : current) {
+ if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+ if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+ }
+ Assert.assertEquals(minTxn, min);
+ Assert.assertEquals(maxTxn, max);
+ boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+ if(vectorize) {
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ }
+
+ String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+ for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+ //run it with each split strategy - make sure there are differences
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+ List<String> actualResult = queryTable(driver, validationQuery);
+ for (int i = 0; i < actualResult.size(); i++) {
+ Assert.assertEquals("diff at [" + i + "]. actual=" + actualResult + " expected=" +
+ Arrays.toString(records), records[i], actualResult.get(i));
+ }
+ }
+ conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+ }
private void checkNothingWritten(Path partitionPath) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -1016,15 +1066,15 @@ public class TestStreaming {
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
-
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten2(partLoc, 15, 24, 1, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming");
txnBatch.close();
@@ -1034,16 +1084,16 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten2(partLoc, 15, 40, 2, validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten2(partLoc, 15, 40, 2, validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming", "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -1053,7 +1103,6 @@ public class TestStreaming {
connection.close();
}
-
@Test
public void testInterleavedTransactionBatchCommits() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
@@ -1078,32 +1127,69 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+ String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+ checkDataWritten2(partLoc, 24, 33, 1,
+ validationQuery, true, "3\tHello streaming - once again");
txnBatch1.commit();
-
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ /*now both batches have committed (but not closed) so we for each primary file we expect a side
+ file to exist and indicate the true length of primary file*/
+ FileSystem fs = partLoc.getFileSystem(conf);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength == actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
-
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+ //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
+ //has now received more data(logically - it's buffered) but it is not yet committed.
+ //lets check that side files exist, etc
+ dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+ for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+ for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+ Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+ Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+ long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+ Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+ lengthFileSize, lengthFileSize > 0);
+ long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+ long actualLength = stat.getLen();
+ Assert.assertTrue("", logicalLength <= actualLength);
+ }
+ }
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
txnBatch1.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}",
- "{3, Hello streaming - once again}");
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, false, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again");
txnBatch2.commit();
- checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}",
- "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten2(partLoc, 14, 33, 2,
+ validationQuery, true, "1\tHello streaming",
+ "2\tWelcome to streaming",
+ "3\tHello streaming - once again",
+ "4\tWelcome to streaming - once again");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch1.getCurrentTransactionState());
@@ -2035,11 +2121,12 @@ public class TestStreaming {
public static ArrayList<String> queryTable(Driver driver, String query)
throws CommandNeedRetryException, IOException {
- driver.run(query);
+ CommandProcessorResponse cpr = driver.run(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(query + " failed: " + cpr);
+ }
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
- if(res.isEmpty())
- System.err.println(driver.getErrorMsg());
return res;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index de41d34..d5429fb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -27,12 +27,12 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -123,35 +123,50 @@ public class StreamingAssert {
}
List<Record> readRecords() throws Exception {
+ return readRecords(1);
+ }
+
+ /**
+ * TODO: this would be more flexible doing a SQL select statement rather than using InputFormat directly
+ * see {@link org.apache.hive.hcatalog.streaming.TestStreaming#checkDataWritten2(Path, long, long, int, String, String...)}
+ * @param numSplitsExpected
+ * @return
+ * @throws Exception
+ */
+ List<Record> readRecords(int numSplitsExpected) throws Exception {
if (currentDeltas.isEmpty()) {
throw new AssertionError("No data");
}
InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionLocation.toString());
- job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+ job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets()));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
- job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+ AcidUtils.setTransactionalTableScan(job,true);
+ job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
InputSplit[] splits = inputFormat.getSplits(job, 1);
- assertEquals(1, splits.length);
-
- final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
- .getRecordReader(splits[0], job, Reporter.NULL);
+ assertEquals(numSplitsExpected, splits.length);
- NullWritable key = recordReader.createKey();
- OrcStruct value = recordReader.createValue();
List<Record> records = new ArrayList<>();
- while (recordReader.next(key, value)) {
- RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
- Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+ for(InputSplit is : splits) {
+ final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+ .getRecordReader(is, job, Reporter.NULL);
+
+ NullWritable key = recordReader.createKey();
+ OrcStruct value = recordReader.createValue();
+
+ while (recordReader.next(key, value)) {
+ RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+ Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
- System.out.println(record);
- records.add(record);
+ System.out.println(record);
+ records.add(record);
+ }
+ recordReader.close();
}
- recordReader.close();
return records;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
index ab9f313..5bfa04d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -529,22 +529,23 @@ public class TestMutations {
StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
indiaAssertions.assertMinTransactionId(1L);
indiaAssertions.assertMaxTransactionId(2L);
- List<Record> indiaRecords = indiaAssertions.readRecords();
+ List<Record> indiaRecords = indiaAssertions.readRecords(2);
assertThat(indiaRecords.size(), is(3));
assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
encodeBucket(0), 0L)));
assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
- assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
- encodeBucket(0), 1L)));
+ assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(2L,
+ encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L,
- encodeBucket(0), 0L)));
+ encodeBucket(0), 1L)));
StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
ukAssertions.assertMinTransactionId(1L);
ukAssertions.assertMaxTransactionId(2L);
- List<Record> ukRecords = ukAssertions.readRecords();
+ //1 split since mutateTransaction txn just does deletes
+ List<Record> ukRecords = ukAssertions.readRecords(1);
assertThat(ukRecords.size(), is(1));
assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
@@ -553,11 +554,11 @@ public class TestMutations {
StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
franceAssertions.assertMinTransactionId(1L);
franceAssertions.assertMaxTransactionId(2L);
- List<Record> franceRecords = franceAssertions.readRecords();
+ List<Record> franceRecords = franceAssertions.readRecords(2);
assertThat(franceRecords.size(), is(1));
assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
- assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
- encodeBucket(0), 1L)));
+ assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(2L,
+ encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
client.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 0f08f43..023d703 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -39,7 +39,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
// These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
- public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
TransactionalValidationListener(Configuration conf) {
super(conf);
@@ -276,7 +275,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
boolean isValid = false;
switch (transactionalProperties) {
case DEFAULT_TRANSACTIONAL_PROPERTY:
- case LEGACY_TRANSACTIONAL_PROPERTY:
isValid = true;
break;
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8999f6f..25ad1e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -148,8 +148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
RecordWriter[] outWriters;
RecordUpdater[] updaters;
Stat stat;
- int acidLastBucket = -1;
- int acidFileOffset = -1;
public FSPaths(Path specPath) {
tmpPath = Utilities.toTempPath(specPath);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 25177ef..9864c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,6 +112,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
private long minTxnId;
private long maxTxnId;
private List<Integer> stmtIds;
+ //would be useful to have enum for Type: insert/delete/load data
public DeltaMetaData() {
this(0,0,new ArrayList<Integer>());
@@ -155,6 +156,11 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
stmtIds.add(in.readInt());
}
}
+ @Override
+ public String toString() {
+ //? is Type - when implemented
+ return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")";
+ }
}
/**
* Options for controlling the record readers.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1e33424..feacdd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -39,11 +39,13 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.OrcAcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +69,18 @@ public class AcidUtils {
};
public static final String DELTA_PREFIX = "delta_";
public static final String DELETE_DELTA_PREFIX = "delete_delta_";
+ /**
+ * Acid Streaming Ingest writes multiple transactions to the same file. It also maintains a
+ * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of
+ * the primary file as of the last commit ({@link OrcRecordUpdater#flush()}). That is the 'logical length'.
+ * Once the primary is closed, the side file is deleted (logical length = actual length) but if
+ * the writer dies or the primary file is being read while its still being written to, anything
+ * past the logical length should be ignored.
+ *
+ * @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX
+ * @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path)
+ * @see #getLogicalLength(FileSystem, FileStatus)
+ */
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
public static final PathFilter deltaFileFilter = new PathFilter() {
@Override
@@ -167,7 +181,7 @@ public class AcidUtils {
* This is format of delete delta dir name prior to Hive 2.2.x
*/
@VisibleForTesting
- static String deleteDeltaSubdir(long min, long max) {
+ public static String deleteDeltaSubdir(long min, long max) {
return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
String.format(DELTA_DIGITS, max);
}
@@ -178,7 +192,7 @@ public class AcidUtils {
* @since 2.2.x
*/
@VisibleForTesting
- static String deleteDeltaSubdir(long min, long max, int statementId) {
+ public static String deleteDeltaSubdir(long min, long max, int statementId) {
return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
}
@@ -371,21 +385,10 @@ public class AcidUtils {
public static final int HASH_BASED_MERGE_BIT = 0x02;
public static final String HASH_BASED_MERGE_STRING = "hash_merge";
public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
- public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
private AcidOperationalProperties() {
}
- /**
- * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
- * that were created before ACID type system using operational properties was put in place.
- * @return the acidOperationalProperties object
- */
- public static AcidOperationalProperties getLegacy() {
- AcidOperationalProperties obj = new AcidOperationalProperties();
- // In legacy mode, none of these properties are turned on.
- return obj;
- }
/**
* Returns an acidOperationalProperties object that represents default ACID behavior for tables
@@ -406,14 +409,11 @@ public class AcidUtils {
*/
public static AcidOperationalProperties parseString(String propertiesStr) {
if (propertiesStr == null) {
- return AcidOperationalProperties.getLegacy();
+ return AcidOperationalProperties.getDefault();
}
if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
return AcidOperationalProperties.getDefault();
}
- if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
- return AcidOperationalProperties.getLegacy();
- }
AcidOperationalProperties obj = new AcidOperationalProperties();
String[] options = propertiesStr.split("\\|");
for (String option : options) {
@@ -1119,7 +1119,12 @@ public class AcidUtils {
public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
}
-
+ /**
+ * @param p - not null
+ */
+ public static boolean isDeleteDelta(Path p) {
+ return p.getName().startsWith(DELETE_DELTA_PREFIX);
+ }
/** Checks if a table is a valid ACID table.
* Note, users are responsible for using the correct TxnManager. We do not look at
* SessionState.get().getTxnMgr().supportsAcid() here
@@ -1171,8 +1176,8 @@ public class AcidUtils {
String transactionalProperties = table.getProperty(
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactionalProperties == null) {
- // If the table does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the table does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(transactionalProperties);
}
@@ -1184,7 +1189,7 @@ public class AcidUtils {
*/
public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
// If the conf does not define any transactional properties, the parseInt() should receive
- // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+ // a value of 1, which will set AcidOperationalProperties to a default type and return that.
return AcidOperationalProperties.parseInt(
HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
}
@@ -1197,8 +1202,8 @@ public class AcidUtils {
public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
- // If the properties does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the properties does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
@@ -1212,9 +1217,44 @@ public class AcidUtils {
Map<String, String> parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
- // If the parameters does not define any transactional properties, we return a legacy type.
- return AcidOperationalProperties.getLegacy();
+ // If the parameters does not define any transactional properties, we return a default type.
+ return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
+ /**
+ * See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}.
+ *
+ * Returns the logical end of file for an acid data file.
+ *
+ * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
+ * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all.
+ * @param file - data file to read/compute splits on
+ */
+ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
+ Path lengths = OrcAcidUtils.getSideFile(file.getPath());
+ if(!fs.exists(lengths)) {
+ /**
+ * if here for delta_x_y that means txn y is resolved and all files in this delta are closed so
+ * they should all have a valid ORC footer and info from NameNode about length is good
+ */
+ return file.getLen();
+ }
+ long len = OrcAcidUtils.getLastFlushLength(fs, file.getPath());
+ if(len >= 0) {
+ /**
+ * if here something is still writing to delta_x_y so read only as far as the last commit,
+ * i.e. where last footer was written. The file may contain more data after 'len' position
+ * belonging to an open txn.
+ */
+ return len;
+ }
+ /**
+ * if here, side file is there but we couldn't read it. We want to avoid a read where
+ * (file.getLen() < 'value from side file' which may happen if file is not closed) because this
+ * means some committed data from 'file' would be skipped.
+ * This should be very unusual.
+ */
+ throw new IOException(lengths + " found but is not readable. Consider waiting or orcfiledump --recover");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index de49fc8..17f3d02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -641,6 +642,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
// & therefore we should be able to retrieve them here and determine appropriate behavior.
// Note that this will be meaningless for non-acid tables & will be set to null.
+ //this is set by Utilities.copyTablePropertiesToConf()
boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
this.acidOperationalProperties = isTableTransactional ?
@@ -972,16 +974,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<OrcSplit> splits = Lists.newArrayList();
for (HdfsFileStatusWithId file : fileStatuses) {
FileStatus fileStatus = file.getFileStatus();
- if (fileStatus.getLen() != 0) {
+ long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus);
+ if (logicalLen != 0) {
Object fileKey = file.getFileId();
if (fileKey == null && allowSyntheticFileIds) {
fileKey = new SyntheticFileId(fileStatus);
}
TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
+ if(entry.getKey() + entry.getValue().getLength() > logicalLen) {
+ //don't create splits for anything past logical EOF
+ continue;
+ }
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
- deltas, -1, fileStatus.getLen());
+ deltas, -1, logicalLen);
splits.add(orcSplit);
}
}
@@ -1002,13 +1009,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* ACID split strategy is used when there is no base directory (when transactions are enabled).
*/
static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
- Path dir;
- List<DeltaMetaData> deltas;
- boolean[] covered;
- int numBuckets;
- AcidOperationalProperties acidOperationalProperties;
+ private Path dir;
+ private List<DeltaMetaData> deltas;
+ private boolean[] covered;
+ private int numBuckets;
+ private AcidOperationalProperties acidOperationalProperties;
- public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+ ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
AcidOperationalProperties acidOperationalProperties) {
this.dir = dir;
this.numBuckets = numBuckets;
@@ -1027,18 +1034,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// with valid user payload data has already been considered as base for the covered buckets.
// Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
- return splits; // return an empty list.
+ return Collections.emptyList();
}
// Generate a split for any buckets that weren't covered.
// This happens in the case where a bucket just has deltas and no
// base.
if (!deltas.isEmpty()) {
- for (int b = 0; b < numBuckets; ++b) {
- if (!covered[b]) {
- splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1));
- }
- }
+ //since HIVE-17089 if here, then it's not an acid table so there should never be any deltas
+ throw new IllegalStateException("Found unexpected deltas: " + deltas + " in " + dir);
}
return splits;
}
@@ -1133,7 +1137,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
- SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
}
@@ -1149,7 +1153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
// Fall back to regular API and create statuses without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
for (FileStatus child : children) {
HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
@@ -1402,7 +1406,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
populateAndCacheStripeDetails();
boolean[] includeStripe = null;
// We can't eliminate stripes if there are deltas because the
- // deltas may change the rows making them match the predicate.
+ // deltas may change the rows making them match the predicate. todo: See HIVE-14516.
if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
String[] colNames =
extractNeededColNames((readerTypes == null ? fileTypes : readerTypes),
@@ -1516,7 +1520,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Reader orcReader = OrcFile.createReader(file.getPath(),
OrcFile.readerOptions(context.conf)
.filesystem(fs)
- .maxLength(file.getLen()));
+ .maxLength(AcidUtils.getLogicalLength(fs, file)));
orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(),
file.getModificationTime());
if (context.cacheStripeDetails) {
@@ -2210,7 +2214,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
totalFileSize += child.getFileStatus().getLen();
AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
(child.getFileStatus().getPath(), context.conf);
- opts.writingBase(true);
int b = opts.getBucketId();
// If the bucket is in the valid range, mark it as covered.
// I wish Hive actually enforced bucketing all of the time.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index a179300..214f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -296,7 +296,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
.rowIndexStride(0);
}
final OrcRecordUpdater.KeyIndexBuilder watcher =
- new OrcRecordUpdater.KeyIndexBuilder();
+ new OrcRecordUpdater.KeyIndexBuilder("compactor");
opts.inspector(options.getInspector())
.callback(watcher);
final Writer writer = OrcFile.createWriter(filename, opts);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 814782a..97c4e3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.orc.OrcUtils;
import org.apache.orc.StripeInformation;
@@ -310,11 +311,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OrcStruct nextRecord;
private final ReaderKey key;
final int bucketId;
+ final int bucketProperty;
- OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+ OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
this.key = key;
this.bucketId = bucketId;
assert bucketId >= 0 : "don't support non-bucketed tables yet";
+ this.bucketProperty = encodeBucketId(conf, bucketId);
}
@Override public final OrcStruct nextRecord() {
return nextRecord;
@@ -348,7 +351,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
new LongWritable(0));
nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
- new IntWritable(bucketId));
+ new IntWritable(bucketProperty));
nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
new LongWritable(nextRowId));
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
@@ -360,7 +363,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
.set(0);
((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
- .set(bucketId);
+ .set(bucketProperty);
((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
.set(0);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
@@ -368,7 +371,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
getRecordReader().next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucketId, nextRowId, 0L, 0);
+ key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -380,6 +383,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return false;//reached EndOfFile
}
}
+ static int encodeBucketId(Configuration conf, int bucketId) {
+ return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+ }
@VisibleForTesting
final static class OriginalReaderPairToRead extends OriginalReaderPair {
private final long rowIdOffset;
@@ -392,7 +398,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
final RecordIdentifier minKey, final RecordIdentifier maxKey,
Reader.Options options, Options mergerOptions, Configuration conf,
ValidTxnList validTxnList) throws IOException {
- super(key, bucketId);
+ super(key, bucketId, conf);
this.reader = reader;
assert !mergerOptions.isCompacting();
assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -444,8 +450,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (rowIdOffset > 0) {
//rowIdOffset could be 0 if all files before current one are empty
/**
- * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
- * int, Reader.Options)} need to fix min/max key since these are used by
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+ * need to fix min/max key since these are used by
* {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
* the key. Clear? */
if (minKey != null) {
@@ -455,7 +461,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* If this is not the 1st file, set minKey 1 less than the start of current file
* (Would not need to set minKey if we knew that there are no delta files)
* {@link #advanceToMinKey()} needs this */
- newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+ newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
}
if (maxKey != null) {
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -485,7 +491,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* of the file so we want to leave it blank to make sure any insert events in delta
* files are included; Conversely, if it's not the last file, set the maxKey so that
* events from deltas that don't modify anything in the current split are excluded*/
- newMaxKey = new RecordIdentifier(0, bucketId,
+ newMaxKey = new RecordIdentifier(0, bucketProperty,
rowIdOffset + reader.getNumberOfRows() - 1);
}
this.minKey = newMinKey;
@@ -536,7 +542,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OriginalReaderPairToCompact(ReaderKey key, int bucketId,
Reader.Options options, Options mergerOptions, Configuration conf,
ValidTxnList validTxnList) throws IOException {
- super(key, bucketId);
+ super(key, bucketId, conf);
assert mergerOptions.isCompacting() : "Should only be used for Compaction";
this.conf = conf;
this.options = options;
@@ -651,8 +657,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* @throws IOException
*/
private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
- Reader.Options options
- ) throws IOException {
+ Reader.Options options,
+ Configuration conf) throws IOException {
long rowLength = 0;
long rowOffset = 0;
long offset = options.getOffset();//this would usually be at block boundary
@@ -660,6 +666,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isTail = true;
RecordIdentifier minKey = null;
RecordIdentifier maxKey = null;
+ int bucketProperty = encodeBucketId(conf, bucket);
/**
* options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
* necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st
@@ -679,10 +686,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
}
if (rowOffset > 0) {
- minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+ minKey = new RecordIdentifier(0, bucketProperty, rowOffset - 1);
}
if (!isTail) {
- maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+ maxKey = new RecordIdentifier(0, bucketProperty, rowOffset + rowLength - 1);
}
return new KeyInterval(minKey, maxKey);
}
@@ -829,7 +836,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
KeyInterval keyInterval;
// find the min/max based on the offset and length (and more for 'original')
if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
} else {
keyInterval = discoverKeyBounds(reader, options);
}
@@ -865,6 +872,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
eventOptions.range(0, Long.MAX_VALUE);
if (deltaDirectory != null) {
for(Path delta: deltaDirectory) {
+ if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
+ //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
+ throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
+ }
ReaderKey key = new ReaderKey();
Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index c30e8fe..429960b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -90,6 +90,7 @@ public class OrcRecordUpdater implements RecordUpdater {
private Path deleteEventPath;
private final FileSystem fs;
private OrcFile.WriterOptions writerOptions;
+ private OrcFile.WriterOptions deleteWriterOptions;
private Writer writer = null;
private boolean writerClosed = false;
private Writer deleteEventWriter = null;
@@ -104,7 +105,7 @@ public class OrcRecordUpdater implements RecordUpdater {
// This records how many rows have been inserted or deleted. It is separate from insertedRows
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
- private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert");
private KeyIndexBuilder deleteEventIndexBuilder;
private StructField recIdField = null; // field to look for the record identifier in
private StructField rowIdField = null; // field inside recId to look for row id in
@@ -148,20 +149,23 @@ public class OrcRecordUpdater implements RecordUpdater {
/**
* An extension to AcidOutputFormat that allows users to add additional
* options.
+ *
+ * todo: since this is only used for testing could we not control the writer some other way?
+ * to simplify {@link #OrcRecordUpdater(Path, AcidOutputFormat.Options)}
*/
- public static class OrcOptions extends AcidOutputFormat.Options {
+ final static class OrcOptions extends AcidOutputFormat.Options {
OrcFile.WriterOptions orcOptions = null;
- public OrcOptions(Configuration conf) {
+ OrcOptions(Configuration conf) {
super(conf);
}
- public OrcOptions orcOptions(OrcFile.WriterOptions opts) {
+ OrcOptions orcOptions(OrcFile.WriterOptions opts) {
this.orcOptions = opts;
return this;
}
- public OrcFile.WriterOptions getOrcOptions() {
+ OrcFile.WriterOptions getOrcOptions() {
return orcOptions;
}
}
@@ -205,6 +209,7 @@ public class OrcRecordUpdater implements RecordUpdater {
this.acidOperationalProperties =
AcidUtils.getAcidOperationalProperties(options.getConfiguration());
}
+ assert this.acidOperationalProperties.isSplitUpdate() : "HIVE-17089?!";
BucketCodec bucketCodec = BucketCodec.V1;
if(options.getConfiguration() != null) {
//so that we can test "old" files
@@ -240,6 +245,8 @@ public class OrcRecordUpdater implements RecordUpdater {
&& !options.isWritingBase()){
flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
options.getReporter());
+ flushLengths.writeLong(0);
+ OrcInputFormat.SHIMS.hflush(flushLengths);
} else {
flushLengths = null;
}
@@ -265,12 +272,26 @@ public class OrcRecordUpdater implements RecordUpdater {
optionsCloneForDelta.getConfiguration());
}
if (this.acidOperationalProperties.isSplitUpdate()) {
+ AcidOutputFormat.Options deleteOptions = options.clone().writingDeleteDelta(true);
// If this is a split-update, we initialize a delete delta file path in anticipation that
// they would write update/delete events to that separate file.
// This writes to a file in directory which starts with "delete_delta_..."
- // The actual initialization of a writer only happens if any delete events are written.
- this.deleteEventPath = AcidUtils.createFilename(path,
- optionsCloneForDelta.writingDeleteDelta(true));
+ // The actual initialization of a writer only happens if any delete events are written
+ //to avoid empty files.
+ this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions);
+ /**
+ * HIVE-14514 is not done so we can't clone writerOptions(). So here we create a new
+ * options object to make sure insert and delete writers don't share them (like the
+ * callback object, for example)
+ * In any case insert writer and delete writer would most likely have very different
+ * characteristics - delete writer only writes a tiny amount of data. Once we do early
+ * update split, each {@link OrcRecordUpdater} will have only 1 writer. (except for Mutate API)
+ * Then it would perhaps make sense to take writerOptions as input - how?.
+ */
+ this.deleteWriterOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(),
+ optionsCloneForDelta.getConfiguration());
+ this.deleteWriterOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+ options.getRecordIdColumn())));
}
// get buffer size and stripe size for base writer
@@ -377,19 +398,10 @@ public class OrcRecordUpdater implements RecordUpdater {
recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
// Initialize a deleteEventWriter if not yet done. (Lazy initialization)
if (deleteEventWriter == null) {
- // Initialize an indexBuilder for deleteEvents.
- deleteEventIndexBuilder = new KeyIndexBuilder();
- // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
- // options remain the same.
-
- // TODO: When we change the callback, we are essentially mutating the writerOptions.
- // This works but perhaps is not a good thing. The proper way to do this would be
- // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
- // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
- // JIRA to fix this.
-
+ // Initialize an indexBuilder for deleteEvents. (HIVE-17284)
+ deleteEventIndexBuilder = new KeyIndexBuilder("delete");
this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
- writerOptions.callback(deleteEventIndexBuilder));
+ deleteWriterOptions.callback(deleteEventIndexBuilder));
}
// A delete/update generates a delete event for the original row.
@@ -461,6 +473,8 @@ public class OrcRecordUpdater implements RecordUpdater {
long len = writer.writeIntermediateFooter();
flushLengths.writeLong(len);
OrcInputFormat.SHIMS.hflush(flushLengths);
+ //multiple transactions only happen for streaming ingest which only allows inserts
+ assert deleteEventWriter == null : "unexpected delete writer for " + path;
}
@Override
@@ -539,12 +553,16 @@ public class OrcRecordUpdater implements RecordUpdater {
}
static class KeyIndexBuilder implements OrcFile.WriterCallback {
- StringBuilder lastKey = new StringBuilder();
+ private final String builderName;
+ StringBuilder lastKey = new StringBuilder();//list of last keys for each stripe
long lastTransaction;
int lastBucket;
long lastRowId;
AcidStats acidStats = new AcidStats();
+ KeyIndexBuilder(String name) {
+ this.builderName = name;
+ }
@Override
public void preStripeWrite(OrcFile.WriterContext context
) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c50c1a8..bff9884 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -859,15 +861,19 @@ public class TestTxnCommands {
for(String s : rs) {
LOG.warn(s);
}
+ Assert.assertEquals(536870912,
+ BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+ Assert.assertEquals(536936448,
+ BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
- Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+ Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
@@ -877,11 +883,11 @@ public class TestTxnCommands {
LOG.warn(s);
}
Assert.assertEquals("", 4, rs.size());
- Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+ Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
- Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+ Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
- Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+ Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
[2/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene
Koifman, reviewed by Sergey Shelukhin)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 408c089..0e0fca3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -64,16 +65,11 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.constraints.AssertTrue;
-
-/**
- * TODO: this should be merged with TestTxnCommands once that is checked in
- * specifically the tests; the supporting code here is just a clone of TestTxnCommands
- */
public class TestTxnCommands2 {
static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
@@ -88,7 +84,7 @@ public class TestTxnCommands2 {
protected HiveConf hiveConf;
protected Driver d;
- protected static enum Table {
+ protected enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart", "p"),
NONACIDORCTBL("nonAcidOrcTbl"),
@@ -113,6 +109,8 @@ public class TestTxnCommands2 {
this.partitionColumns = partitionColumns;
}
}
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp() throws Exception {
@@ -357,20 +355,22 @@ public class TestTxnCommands2 {
for(String s : rs) {
LOG.warn(s);
}
+ Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+ Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
/*
* All ROW__IDs are unique on read after conversion to acid
* ROW__IDs are exactly the same before and after compaction
- * Also check the file name after compaction for completeness
+ * Also check the file name (only) after compaction for completeness
*/
String[][] expected = {
- {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"},
{"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
{"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6", "bucket_00001"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"},
{"{\"transactionid\":18,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
};
Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
@@ -392,9 +392,20 @@ public class TestTxnCommands2 {
}
//make sure they are the same before and after compaction
}
-
/**
- * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+ * In current implementation of ACID, altering the value of transactional_properties or trying to
+ * set a value for previously unset value for an acid table will throw an exception.
+ * @throws Exception
+ */
+ @Test
+ public void testFailureOnAlteringTransactionalProperties() throws Exception {
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
+ runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+ }
+ /**
+ * Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Insert a row to ACID table
@@ -410,7 +421,7 @@ public class TestTxnCommands2 {
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -426,7 +437,7 @@ public class TestTxnCommands2 {
// 2. Convert NONACIDORCTBL to ACID table
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -442,24 +453,23 @@ public class TestTxnCommands2 {
// 3. Insert another row to newly-converted ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
- // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
+ // The delta directory should also have only 1 bucket file (bucket_00001)
Assert.assertEquals(3, status.length);
boolean sawNewDelta = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1, buckets.length); // only one bucket file
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
}
Assert.assertTrue(sawNewDelta);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
resultData = new int[][] {{1, 2}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
@@ -472,16 +482,15 @@ public class TestTxnCommands2 {
// There should be 1 new directory: base_xxxxxxx.
// Original bucket files and delta directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(4, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
}
}
Assert.assertTrue(sawNewBase);
@@ -495,13 +504,13 @@ public class TestTxnCommands2 {
// 5. Let Cleaner delete obsolete files/dirs
// Note, here we create a fake directory along with fake files as original directories/files
String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
- "/subdir/000000_0";
+ "/subdir/000000_0";
String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
- "/subdir/000000_1";
+ "/subdir/000000_1";
fs.create(new Path(fakeFile0));
fs.create(new Path(fakeFile1));
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// Before Cleaner, there should be 5 items:
// 2 original files, 1 original directory, 1 base directory and 1 delta directory
Assert.assertEquals(5, status.length);
@@ -509,13 +518,12 @@ public class TestTxnCommands2 {
// There should be only 1 directory left: base_xxxxxxx.
// Original bucket files and delta directory should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
@@ -525,7 +533,7 @@ public class TestTxnCommands2 {
}
/**
- * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+ * Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Update the existing row in ACID table
@@ -541,7 +549,7 @@ public class TestTxnCommands2 {
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -550,14 +558,14 @@ public class TestTxnCommands2 {
List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
int [][] resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
int resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 2. Convert NONACIDORCTBL to ACID table
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -566,29 +574,39 @@ public class TestTxnCommands2 {
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 2}};
Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 3. Update the existing row in newly-converted ACID table
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
+ // and one delete_delta directory. When split-update is enabled, an update event is split into
+ // a combination of delete and insert, that generates the delete_delta directory.
// The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
- Assert.assertEquals(3, status.length);
+ // and so should the delete_delta directory.
+ Assert.assertEquals(4, status.length);
boolean sawNewDelta = false;
+ boolean sawNewDeleteDelta = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+ sawNewDeleteDelta = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
} else {
Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
}
}
Assert.assertTrue(sawNewDelta);
+ Assert.assertTrue(sawNewDeleteDelta);
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}};
Assert.assertEquals(stringifyValues(resultData), rs);
@@ -602,8 +620,8 @@ public class TestTxnCommands2 {
// There should be 1 new directory: base_0000001.
// Original bucket files and delta directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(4, status.length);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(5, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
@@ -623,15 +641,15 @@ public class TestTxnCommands2 {
// 5. Let Cleaner delete obsolete files/dirs
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Before Cleaner, there should be 4 items:
- // 2 original files, 1 delta directory and 1 base directory
- Assert.assertEquals(4, status.length);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Before Cleaner, there should be 5 items:
+ // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
+ Assert.assertEquals(5, status.length);
runCleaner(hiveConf);
// There should be only 1 directory left: base_0000001.
- // Original bucket files and delta directory should have been cleaned up.
+ // Original bucket files, delta directory and delete_delta should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
@@ -646,7 +664,7 @@ public class TestTxnCommands2 {
}
/**
- * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+ * Test the query correctness and directory layout for ACID table conversion
* 1. Insert a row to Non-ACID table
* 2. Convert Non-ACID to ACID table
* 3. Perform Major compaction
@@ -663,7 +681,7 @@ public class TestTxnCommands2 {
// 1. Insert a row to Non-ACID table
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// There should be 2 original bucket files in the location (000000_0 and 000001_0)
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -676,10 +694,10 @@ public class TestTxnCommands2 {
int resultCount = 1;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
- // 2. Convert NONACIDORCTBL to ACID table
+ // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// Everything should be same as before
Assert.assertEquals(BUCKET_COUNT, status.length);
for (int i = 0; i < status.length; i++) {
@@ -698,7 +716,7 @@ public class TestTxnCommands2 {
// There should be 1 new directory: base_-9223372036854775808
// Original bucket files should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(3, status.length);
boolean sawNewBase = false;
for (int i = 0; i < status.length; i++) {
@@ -722,12 +740,14 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
// There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
- // plus two new delta directories
- Assert.assertEquals(5, status.length);
+ // plus two new delta directories and one delete_delta directory that would be created due to
+ // the update statement (remember split-update U=D+I)!
+ Assert.assertEquals(6, status.length);
int numDelta = 0;
+ int numDeleteDelta = 0;
sawNewBase = false;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("delta_.*")) {
@@ -740,9 +760,17 @@ public class TestTxnCommands2 {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
- Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+ numDeleteDelta++;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numDeleteDelta == 1) {
+ Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
} else if (status[i].getPath().getName().matches("base_.*")) {
Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
@@ -755,11 +783,13 @@ public class TestTxnCommands2 {
}
}
Assert.assertEquals(2, numDelta);
+ Assert.assertEquals(1, numDeleteDelta);
Assert.assertTrue(sawNewBase);
+
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
@@ -767,11 +797,12 @@ public class TestTxnCommands2 {
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
// There should be 1 new base directory: base_0000001
- // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in.
+ // Original bucket files, delta directories, delete_delta directories and the
+ // previous base directory should stay until Cleaner kicks in.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(status);
- Assert.assertEquals(6, status.length);
+ Assert.assertEquals(7, status.length);
int numBase = 0;
for (int i = 0; i < status.length; i++) {
if (status[i].getPath().getName().matches("base_.*")) {
@@ -785,9 +816,8 @@ public class TestTxnCommands2 {
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
Assert.assertEquals("base_0000023", status[i].getPath().getName());
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
- Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
}
}
@@ -795,28 +825,27 @@ public class TestTxnCommands2 {
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
// 6. Let Cleaner delete obsolete files/dirs
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
// Before Cleaner, there should be 6 items:
- // 2 original files, 2 delta directories and 2 base directories
- Assert.assertEquals(6, status.length);
+ // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
+ Assert.assertEquals(7, status.length);
runCleaner(hiveConf);
// There should be only 1 directory left: base_0000001.
// Original bucket files, delta directories and previous base directory should have been cleaned up.
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
Assert.assertEquals("base_0000023", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
- Assert.assertEquals(BUCKET_COUNT, buckets.length);
- Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
- Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
resultData = new int[][] {{1, 3}, {3, 4}};
Assert.assertEquals(stringifyValues(resultData), rs);
@@ -824,9 +853,6 @@ public class TestTxnCommands2 {
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
-
-
-
@Test
public void testValidTxnsBookkeeping() throws Exception {
// 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
deleted file mode 100644
index 520e958..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'.
- * This tests whether ACID tables with split-update turned on are working correctly or not
- * for the same set of tests when it is turned off. Of course, it also adds a few tests to test
- * specific behaviors of ACID tables with split-update turned on.
- */
-public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
-
- public TestTxnCommands2WithSplitUpdate() {
- super();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Override
- @Before
- public void setUp() throws Exception {
- setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
- }
-
- @Override
- @Test
- public void testInitiatorWithMultipleFailedCompactions() throws Exception {
- // Test with split-update turned on.
- testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
- }
-
- @Override
- @Test
- public void writeBetweenWorkerAndCleaner() throws Exception {
- writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
- }
-
- @Override
- @Test
- public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
- testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
- }
-
- /**
- * In current implementation of ACID, altering the value of transactional_properties or trying to
- * set a value for previously unset value for an acid table will throw an exception.
- * @throws Exception
- */
- @Test
- public void testFailureOnAlteringTransactionalProperties() throws Exception {
- expectedException.expect(RuntimeException.class);
- expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
- runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
- }
- /**
- * Test the query correctness and directory layout for ACID table conversion with split-update
- * enabled.
- * 1. Insert a row to Non-ACID table
- * 2. Convert Non-ACID to ACID table with split-update enabled
- * 3. Insert a row to ACID table
- * 4. Perform Major compaction
- * 5. Clean
- * @throws Exception
- */
- @Test
- @Override
- public void testNonAcidToAcidConversion1() throws Exception {
- FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] status;
-
- // 1. Insert a row to Non-ACID table
- runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files in the location (000000_0 and 000001_0)
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- int [][] resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- int resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 2. Convert NONACIDORCTBL to ACID table
- runStatementOnDriver("alter table " + Table.NONACIDORCTBL
- + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Everything should be same as before
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 3. Insert another row to newly-converted ACID table
- runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
- // The delta directory should also have only 1 bucket file (bucket_00001)
- Assert.assertEquals(3, status.length);
- boolean sawNewDelta = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("delta_.*")) {
- sawNewDelta = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, buckets.length); // only one bucket file
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
- } else {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- }
- Assert.assertTrue(sawNewDelta);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
- resultData = new int[][] {{1, 2}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 4. Perform a major compaction
- runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- // There should be 1 new directory: base_xxxxxxx.
- // Original bucket files and delta directory should stay until Cleaner kicks in.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(4, status.length);
- boolean sawNewBase = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("base_.*")) {
- sawNewBase = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
- }
- }
- Assert.assertTrue(sawNewBase);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 5. Let Cleaner delete obsolete files/dirs
- // Note, here we create a fake directory along with fake files as original directories/files
- String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
- "/subdir/000000_0";
- String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
- "/subdir/000000_1";
- fs.create(new Path(fakeFile0));
- fs.create(new Path(fakeFile1));
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Before Cleaner, there should be 5 items:
- // 2 original files, 1 original directory, 1 base directory and 1 delta directory
- Assert.assertEquals(5, status.length);
- runCleaner(hiveConf);
- // There should be only 1 directory left: base_xxxxxxx.
- // Original bucket files and delta directory should have been cleaned up.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, status.length);
- Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
- FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
- }
-
- /**
- * Test the query correctness and directory layout for ACID table conversion with split-update
- * enabled.
- * 1. Insert a row to Non-ACID table
- * 2. Convert Non-ACID to ACID table with split update enabled.
- * 3. Update the existing row in ACID table
- * 4. Perform Major compaction
- * 5. Clean
- * @throws Exception
- */
- @Test
- @Override
- public void testNonAcidToAcidConversion2() throws Exception {
- FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] status;
-
- // 1. Insert a row to Non-ACID table
- runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files in the location (000000_0 and 000001_0)
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- int [][] resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- int resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 2. Convert NONACIDORCTBL to ACID table
- runStatementOnDriver("alter table " + Table.NONACIDORCTBL
- + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Everything should be same as before
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 3. Update the existing row in newly-converted ACID table
- runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
- // and one delete_delta directory. When split-update is enabled, an update event is split into
- // a combination of delete and insert, that generates the delete_delta directory.
- // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
- // and so should the delete_delta directory.
- Assert.assertEquals(4, status.length);
- boolean sawNewDelta = false;
- boolean sawNewDeleteDelta = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("delta_.*")) {
- sawNewDelta = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
- } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
- sawNewDeleteDelta = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
- } else {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- }
- Assert.assertTrue(sawNewDelta);
- Assert.assertTrue(sawNewDeleteDelta);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 4. Perform a major compaction
- runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- // There should be 1 new directory: base_0000001.
- // Original bucket files and delta directory should stay until Cleaner kicks in.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(5, status.length);
- boolean sawNewBase = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("base_.*")) {
- sawNewBase = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
- }
- }
- Assert.assertTrue(sawNewBase);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 5. Let Cleaner delete obsolete files/dirs
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Before Cleaner, there should be 5 items:
- // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
- Assert.assertEquals(5, status.length);
- runCleaner(hiveConf);
- // There should be only 1 directory left: base_0000001.
- // Original bucket files, delta directory and delete_delta should have been cleaned up.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, status.length);
- Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
- FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
- }
-
- /**
- * Test the query correctness and directory layout for ACID table conversion with split-update
- * enabled.
- * 1. Insert a row to Non-ACID table
- * 2. Convert Non-ACID to ACID table with split-update enabled
- * 3. Perform Major compaction
- * 4. Insert a new row to ACID table
- * 5. Perform another Major compaction
- * 6. Clean
- * @throws Exception
- */
- @Test
- @Override
- public void testNonAcidToAcidConversion3() throws Exception {
- FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] status;
-
- // 1. Insert a row to Non-ACID table
- runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // There should be 2 original bucket files in the location (000000_0 and 000001_0)
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- int [][] resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- int resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
- runStatementOnDriver("alter table " + Table.NONACIDORCTBL
- + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Everything should be same as before
- Assert.assertEquals(BUCKET_COUNT, status.length);
- for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 3. Perform a major compaction
- runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- // There should be 1 new directory: base_-9223372036854775808
- // Original bucket files should stay until Cleaner kicks in.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(3, status.length);
- boolean sawNewBase = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("base_.*")) {
- Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
- sawNewBase = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- }
- }
- Assert.assertTrue(sawNewBase);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 2}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 1;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 4. Update the existing row, and insert another row to newly-converted ACID table
- runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
- runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
- // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
- // plus two new delta directories and one delete_delta directory that would be created due to
- // the update statement (remember split-update U=D+I)!
- Assert.assertEquals(6, status.length);
- int numDelta = 0;
- int numDeleteDelta = 0;
- sawNewBase = false;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("delta_.*")) {
- numDelta++;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(buckets);
- if (numDelta == 1) {
- Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- } else if (numDelta == 2) {
- Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
- Assert.assertEquals(1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- }
- } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
- numDeleteDelta++;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(buckets);
- if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- }
- } else if (status[i].getPath().getName().matches("base_.*")) {
- Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
- sawNewBase = true;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- } else {
- Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
- }
- }
- Assert.assertEquals(2, numDelta);
- Assert.assertEquals(1, numDeleteDelta);
- Assert.assertTrue(sawNewBase);
-
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 5. Perform another major compaction
- runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
- runWorker(hiveConf);
- // There should be 1 new base directory: base_0000001
- // Original bucket files, delta directories, delete_delta directories and the
- // previous base directory should stay until Cleaner kicks in.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(status);
- Assert.assertEquals(7, status.length);
- int numBase = 0;
- for (int i = 0; i < status.length; i++) {
- if (status[i].getPath().getName().matches("base_.*")) {
- numBase++;
- FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(buckets);
- if (numBase == 1) {
- Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
- Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- } else if (numBase == 2) {
- // The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000023", status[i].getPath().getName());
- Assert.assertEquals(1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- }
- }
- }
- Assert.assertEquals(2, numBase);
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
- // 6. Let Cleaner delete obsolete files/dirs
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- // Before Cleaner, there should be 6 items:
- // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
- Assert.assertEquals(7, status.length);
- runCleaner(hiveConf);
- // There should be only 1 directory left: base_0000001.
- // Original bucket files, delta directories and previous base directory should have been cleaned up.
- status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
- (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
- Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000023", status[0].getPath().getName());
- FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
- Arrays.sort(buckets);
- Assert.assertEquals(1, buckets.length);
- Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
- rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
- resultData = new int[][] {{1, 3}, {3, 4}};
- Assert.assertEquals(stringifyValues(resultData), rs);
- rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
- resultCount = 2;
- Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
index 44a9412..c76d654 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -23,11 +23,11 @@ import org.junit.Before;
import org.junit.Test;
/**
- * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by
* default, and having 'transactional_properties' set to 'default'. This specifically tests the
* fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
*/
-public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2 {
public TestTxnCommands2WithSplitUpdateAndVectorization() {
super();
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 44ff65c..06e4f98 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
+import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.junit.Assert;
import org.junit.Test;
@@ -669,21 +670,12 @@ public class TestAcidUtils {
@Test
public void testAcidOperationalProperties() throws Exception {
- AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy();
- assertsForAcidOperationalProperties(testObj, "legacy");
-
- testObj = AcidUtils.AcidOperationalProperties.getDefault();
+ AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getDefault();
assertsForAcidOperationalProperties(testObj, "default");
- testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
- assertsForAcidOperationalProperties(testObj, "legacy");
-
testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
assertsForAcidOperationalProperties(testObj, "split_update");
- testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
- assertsForAcidOperationalProperties(testObj, "legacy");
-
testObj = AcidUtils.AcidOperationalProperties.parseString("default");
assertsForAcidOperationalProperties(testObj, "default");
@@ -699,12 +691,6 @@ public class TestAcidUtils {
assertEquals(1, testObj.toInt());
assertEquals("|split_update", testObj.toString());
break;
- case "legacy":
- assertEquals(false, testObj.isSplitUpdate());
- assertEquals(false, testObj.isHashBasedMerge());
- assertEquals(0, testObj.toInt());
- assertEquals("", testObj.toString());
- break;
default:
break;
}
@@ -716,7 +702,7 @@ public class TestAcidUtils {
Configuration testConf = new Configuration();
// Test setter for configuration object.
AcidUtils.setAcidOperationalProperties(testConf, oprProps);
- assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+ assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1));
// Test getter for configuration object.
assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString());
@@ -726,12 +712,15 @@ public class TestAcidUtils {
assertEquals(oprProps.toString(),
parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
// Test getter for map object.
- // Calling a get on the 'parameters' will still return legacy type because the setters/getters
- // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES
- // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES.
- assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+ assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString());
// Set the appropriate key in the map and test that we are able to read it back correctly.
assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
}
+
+ /**
+ * See {@link TestOrcRawRecordMerger#testGetLogicalLength()}
+ */
+ public void testGetLogicalLength() throws Exception {
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index b004cf5..53bd08c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -75,12 +75,14 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -597,12 +599,13 @@ public class TestInputOutputFormat {
@Test
public void testACIDSplitStrategy() throws Exception {
conf.set("bucket_count", "2");
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
MockFileSystem fs = new MockFileSystem(conf,
- new MockFile("mock:/a/delta_000_001/part-00", 1000, new byte[1], new MockBlock("host1")),
- new MockFile("mock:/a/delta_000_001/part-01", 1000, new byte[1], new MockBlock("host1")),
- new MockFile("mock:/a/delta_001_002/part-02", 1000, new byte[1], new MockBlock("host1")),
- new MockFile("mock:/a/delta_001_002/part-03", 1000, new byte[1], new MockBlock("host1")));
+ new MockFile("mock:/a/delta_000_001/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_000_001/bucket_000001", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1")));
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a"), false, null);
@@ -611,9 +614,9 @@ public class TestInputOutputFormat {
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
ColumnarSplitSizeEstimator splitSizeEstimator = new ColumnarSplitSizeEstimator();
for (OrcSplit split: splits) {
- assertEquals(Integer.MAX_VALUE, splitSizeEstimator.getEstimatedSize(split));
+ assertEquals(1, splitSizeEstimator.getEstimatedSize(split));
}
- assertEquals(2, splits.size());
+ assertEquals(4, splits.size());
}
@Test
@@ -1105,6 +1108,9 @@ public class TestInputOutputFormat {
}
}
+ /**
+ * WARNING: detele(Path...) don't actually delete
+ */
public static class MockFileSystem extends FileSystem {
final List<MockFile> files = new ArrayList<MockFile>();
final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>();
@@ -1230,14 +1236,32 @@ public class TestInputOutputFormat {
public boolean delete(Path path) throws IOException {
statistics.incrementWriteOps(1);
checkAccess();
- return false;
+ int removed = 0;
+ for(int i = 0; i < files.size(); i++) {
+ MockFile mf = files.get(i);
+ if(path.equals(mf.path)) {
+ files.remove(i);
+ removed++;
+ break;
+ }
+ }
+ for(int i = 0; i < globalFiles.size(); i++) {
+ MockFile mf = files.get(i);
+ if(path.equals(mf.path)) {
+ globalFiles.remove(i);
+ removed++;
+ break;
+ }
+ }
+ return removed > 0;
}
@Override
public boolean delete(Path path, boolean b) throws IOException {
- statistics.incrementWriteOps(1);
- checkAccess();
- return false;
+ if(b) {
+ throw new UnsupportedOperationException();
+ }
+ return delete(path);
}
@Override
@@ -2690,9 +2714,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktable
- // call-2: open - mock:/mocktable/0_0
- // call-3: open - mock:/mocktable/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check existence of side file for mock:/mocktable/0_0
+ // call-3: open - mock:/mocktable/0_0
+ // call-4: check existence of side file for mock:/mocktable/0_1
+ // call-5: open - mock:/mocktable/0_1
+ assertEquals(5, readOpsDelta);
assertEquals(2, splits.length);
// revert back to local fs
@@ -2748,9 +2774,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl
- // call-2: open - mock:/mocktbl/0_0
- // call-3: open - mock:/mocktbl/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check existence of side file for mock:/mocktbl/0_0
+ // call-3: open - mock:/mocktbl/0_0
+ // call-4: check existence of side file for mock:/mocktbl/0_1
+ // call-5: open - mock:/mocktbl/0_1
+ assertEquals(5, readOpsDelta);
// force BI to avoid reading footers
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
@@ -2768,7 +2796,9 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl
- assertEquals(1, readOpsDelta);
+ // call-2: check existence of side file for mock:/mocktbl/0_0
+ // call-3: check existence of side file for mock:/mocktbl/0_1
+ assertEquals(3, readOpsDelta);
// enable cache and use default strategy
conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb");
@@ -2787,9 +2817,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl
- // call-2: open - mock:/mocktbl/0_0
- // call-3: open - mock:/mocktbl/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check existence of side file for mock:/mocktbl/0_0
+ // call-3: open - mock:/mocktbl/0_0
+ // call-4: check existence of side file for mock:/mocktbl/0_1
+ // call-5: open - mock:/mocktbl/0_1
+ assertEquals(5, readOpsDelta);
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2859,9 +2891,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktable
- // call-2: open - mock:/mocktbl1/0_0
- // call-3: open - mock:/mocktbl1/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check side file for mock:/mocktbl1/0_0
+ // call-3: open - mock:/mocktbl1/0_0
+ // call-4: check side file for mock:/mocktbl1/0_1
+ // call-5: open - mock:/mocktbl1/0_1
+ assertEquals(5, readOpsDelta);
// change file length and look for cache misses
@@ -2898,9 +2932,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktable
- // call-2: open - mock:/mocktbl1/0_0
- // call-3: open - mock:/mocktbl1/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check side file for mock:/mocktbl1/0_0
+ // call-3: open - mock:/mocktbl1/0_0
+ // call-4: check side file for mock:/mocktbl1/0_1
+ // call-5: open - mock:/mocktbl1/0_1
+ assertEquals(5, readOpsDelta);
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2971,9 +3007,11 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl2
- // call-2: open - mock:/mocktbl2/0_0
- // call-3: open - mock:/mocktbl2/0_1
- assertEquals(3, readOpsDelta);
+ // call-2: check side file for mock:/mocktbl2/0_0
+ // call-3: open - mock:/mocktbl2/0_0
+ // call-4: check side file for mock:/mocktbl2/0_1
+ // call-5: open - mock:/mocktbl2/0_1
+ assertEquals(5, readOpsDelta);
// change file modification time and look for cache misses
FileSystem fs1 = FileSystem.get(conf);
@@ -2993,8 +3031,9 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl2
- // call-2: open - mock:/mocktbl2/0_1
- assertEquals(2, readOpsDelta);
+ // call-2: check side file for mock:/mocktbl2/0_1
+ // call-3: open - mock:/mocktbl2/0_1
+ assertEquals(3, readOpsDelta);
// touch the next file
fs1 = FileSystem.get(conf);
@@ -3014,8 +3053,9 @@ public class TestInputOutputFormat {
}
}
// call-1: listLocatedStatus - mock:/mocktbl2
- // call-2: open - mock:/mocktbl2/0_0
- assertEquals(2, readOpsDelta);
+ // call-2: check side file for mock:/mocktbl2/0_0
+ // call-3: open - mock:/mocktbl2/0_0
+ assertEquals(3, readOpsDelta);
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3332,6 +3372,7 @@ public class TestInputOutputFormat {
MockFileSystem fs = new MockFileSystem(conf);
MockPath mockPath = new MockPath(fs, "mock:///mocktable5");
conf.set("hive.transactional.table.scan", "true");
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
conf.set("hive.orc.splits.include.file.footer", "false");
@@ -3409,6 +3450,7 @@ public class TestInputOutputFormat {
MockFileSystem fs = new MockFileSystem(conf);
MockPath mockPath = new MockPath(fs, "mock:///mocktable6");
conf.set("hive.transactional.table.scan", "true");
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
conf.set("hive.orc.splits.include.file.footer", "true");
@@ -3481,15 +3523,14 @@ public class TestInputOutputFormat {
@Test
public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception {
- MockFileSystem fs = new MockFileSystem(conf);
+ conf.set("fs.defaultFS", "mock:///");
+ conf.set("fs.mock.impl", MockFileSystem.class.getName());
+ FileSystem fs = FileSystem.get(conf);
MockPath mockPath = new MockPath(fs, "mock:///mocktable7");
- conf.set("hive.transactional.table.scan", "true");
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
conf.set("hive.orc.splits.include.file.footer", "false");
conf.set("mapred.input.dir", mockPath.toString());
- conf.set("fs.defaultFS", "mock:///");
- conf.set("fs.mock.impl", MockFileSystem.class.getName());
StructObjectInspector inspector;
synchronized (TestOrcFile.class) {
inspector = (StructObjectInspector)
@@ -3505,17 +3546,22 @@ public class TestInputOutputFormat {
}
writer.close();
- writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
- OrcFile.writerOptions(conf).blockPadding(false)
- .bufferSize(1024).inspector(inspector));
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+ .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+ OrcOutputFormat of = new OrcOutputFormat();
+ RecordUpdater ru = of.getRecordUpdater(mockPath, options);
for (int i = 0; i < 10; ++i) {
- writer.addRow(new MyRow(i, 2 * i));
+ ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
}
- writer.close();
+ ru.close(false);//this deletes the side file
+
+ //set up props for read
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ AcidUtils.setTransactionalTableScan(conf, true);
OrcInputFormat orcInputFormat = new OrcInputFormat();
InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
- assertEquals(1, splits.length);
+ assertEquals(2, splits.length);
int readOpsBefore = -1;
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3530,14 +3576,8 @@ public class TestInputOutputFormat {
assertTrue(split.toString().contains("start=3"));
assertTrue(split.toString().contains("hasFooter=false"));
assertTrue(split.toString().contains("hasBase=true"));
- // NOTE: don't be surprised if deltas value is different
- // in older release deltas=2 as min and max transaction are added separately to delta list.
- // in newer release since both of them are put together deltas=1
- assertTrue(split.toString().contains("deltas=1"));
- if (split instanceof OrcSplit) {
- assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
- " orc splits.", ((OrcSplit) split).hasFooter());
- }
+ assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
+ " orc splits.", ((OrcSplit) split).hasFooter());
orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
}
@@ -3547,11 +3587,9 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: open to read footer - split 1 => mock:/mocktable7/0_0
- // call-2: open to read data - split 1 => mock:/mocktable7/0_0
- // call-3: open side file (flush length) of delta directory
- // call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file
- // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0
+ // call-1: open to read data - split 1 => mock:/mocktable8/0_0
+ // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+ // call-3: split 2 - read delta_x_y/bucket_00001
assertEquals(5, readOpsDelta);
// revert back to local fs
@@ -3560,15 +3598,14 @@ public class TestInputOutputFormat {
@Test
public void testACIDReaderFooterSerializeWithDeltas() throws Exception {
- MockFileSystem fs = new MockFileSystem(conf);
+ conf.set("fs.defaultFS", "mock:///");
+ conf.set("fs.mock.impl", MockFileSystem.class.getName());
+ FileSystem fs = FileSystem.get(conf);//ensures that FS object is cached so that everyone uses the same instance
MockPath mockPath = new MockPath(fs, "mock:///mocktable8");
- conf.set("hive.transactional.table.scan", "true");
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
conf.set("hive.orc.splits.include.file.footer", "true");
conf.set("mapred.input.dir", mockPath.toString());
- conf.set("fs.defaultFS", "mock:///");
- conf.set("fs.mock.impl", MockFileSystem.class.getName());
StructObjectInspector inspector;
synchronized (TestOrcFile.class) {
inspector = (StructObjectInspector)
@@ -3584,17 +3621,22 @@ public class TestInputOutputFormat {
}
writer.close();
- writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
- OrcFile.writerOptions(conf).blockPadding(false)
- .bufferSize(1024).inspector(inspector));
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+ .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+ OrcOutputFormat of = new OrcOutputFormat();
+ RecordUpdater ru = of.getRecordUpdater(mockPath, options);
for (int i = 0; i < 10; ++i) {
- writer.addRow(new MyRow(i, 2 * i));
+ ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
}
- writer.close();
+ ru.close(false);//this deletes the side file
+
+ //set up props for read
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ AcidUtils.setTransactionalTableScan(conf, true);
OrcInputFormat orcInputFormat = new OrcInputFormat();
InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
- assertEquals(1, splits.length);
+ assertEquals(2, splits.length);
int readOpsBefore = -1;
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3609,14 +3651,8 @@ public class TestInputOutputFormat {
assertTrue(split.toString().contains("start=3"));
assertTrue(split.toString().contains("hasFooter=true"));
assertTrue(split.toString().contains("hasBase=true"));
- // NOTE: don't be surprised if deltas value is different
- // in older release deltas=2 as min and max transaction are added separately to delta list.
- // in newer release since both of them are put together deltas=1
- assertTrue(split.toString().contains("deltas=1"));
- if (split instanceof OrcSplit) {
- assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
- " orc splits.", ((OrcSplit) split).hasFooter());
- }
+ assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
+ " orc splits.", ((OrcSplit) split).hasFooter());
orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
}
@@ -3627,10 +3663,9 @@ public class TestInputOutputFormat {
}
}
// call-1: open to read data - split 1 => mock:/mocktable8/0_0
- // call-2: open side file (flush length) of delta directory
- // call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file
- // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0
- assertEquals(4, readOpsDelta);
+ // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+ // call-3: split 2 - read delta_x_y/bucket_00001
+ assertEquals(3, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");